o
    ?߱i;a                     @  sx  U d dl mZ d dlZd dlZd dlmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZmZmZmZ er=d dlmZmZ edd e	jd	dd
 D Zded< edkred dlmZ d dlmZmZ nedkree	jdree	jdrd dl	mZ d dl mZmZ G dd dZ!G dd deddee" fdee" fdeee"ee# ee# f  fgZ$G dd de!Z%d$d"d#Z&dS )%    )annotationsN)TYPE_CHECKINGList
NamedTupleTuple)ImaginaireModel)callbackdistributedlogmiscobject_store)CheckpointConfig	JobConfigc                 c  s    | ]}t |V  qd S N)int).0x r   \/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/utils/checkpointer.py	<genexpr>   s    r   .   zTuple[int, ...]TORCH_VERSION)      )quantization)FakeQuantizeBaseObserverBaser      r   r   c                   @  s   e Zd ZdZd5dd	Zd6ddZedd7d8dd Zed!	d7d8d"d#Z	ed$	%	%	%d9d:d)d*Z
d;d,d-Zd<d.d/Zd=d1d2Zd>d3d4Zd%S )?Checkpointerz^The checkpointer class. Supports checkpoint saving/loading to both local disk or object store.config_checkpointr   
config_jobr   	callbackscallback.CallBackGroupc                 C  s   || _ |j d| _|j d| _|jj| _|jj| _|j| _|j	p#d| _	|j
| _
|j| _d| _| jr:t|j| _| jrFt|j| _dS dS )zConstructor of the checkpointer.

        Args:
            config_checkpoint (CheckpointConfig): The config object for the checkpointer.
        z/checkpointsN)r#   
path_localcheckpoint_dir_localpathcheckpoint_dir_object_storesave_to_object_storeenabledload_from_object_storestrict_resume	load_pathload_training_stateonly_load_scheduler_statesave_threadr   ObjectStoreobject_store_saverobject_store_loader)selfr!   r"   r#   r   r   r   __init__.   s   

zCheckpointer.__init__modelr   	optimizertorch.optim.Optimizer	scheduler$torch.optim.lr_scheduler.LRSchedulergrad_scalertorch.amp.GradScaler	iterationr   returnNonec                 C  s   | j || d|dd}t dkrVt| | | | |d}tj|dd}| j j||d | j	r<| j	
  tj| jrD| jn| jd	||t fd
| _	| j	  | j jd|d dS )  Save network weights, optimizer parameters, scheduler parameters to a checkpoint.

        Args:
            model (ImaginaireModel): The PyTorch model.
            optimizer (torch.optim.Optimizer): The model optimizer.
            scheduler (torch.optim.lr_scheduler.LRScheduler): The optimization scheduler.
            grad_scaler (torch.amp.GradScaler): The gradient scaler (for mixed precision training).
            iteration (int): Current iteration number.
        iter_09.ptr   r6   r7   r9   r;   r=   cpudevice
state_dictFtargetdaemonargsN)r6   r=   )r#   on_save_checkpoint_startr	   get_rankdictrI   r   toon_save_checkpointr0   join	threadingThreadr)   _save_worker_object_store_save_worker_localstarton_save_checkpoint_end)r4   r6   r7   r9   r;   r=   checkpoint_filerI   r   r   r   saveF   s*   

zCheckpointer.savezcheckpoint saving (local)r   rI   dict[str, torch.Tensor]rZ   strrankc              
   C  s   t j| j|}t j| jdd z-t|| |dkr | | t	d|  t
|dddd}| jj|d W d
S  tyX } ztd	|  W Y d
}~d
S d
}~ww )a`  Worker to save checkpoint to local disk, spawned with a child thread (runs in parallel with the training).

        Args:
            state_dict (dict[str, torch.Tensor]): The state dict of the model/optimizer/scheduler.
            checkpoint_file (str): The file name of the model checkpoint.
            rank (int): GPU device (default: 0).
        T)exist_okr   zSaved checkpoint (local): rA    rC   r=   z#Checkpoint failed to save (local): N)osr'   rS   r&   makedirstorchr[   _write_latest_checkpoint_filer
   successr   replacer#   on_save_checkpoint_success	Exception	exceptionr4   rI   rZ   r^   checkpoint_pathr=   er   r   r   rW   t   s   	
zCheckpointer._save_worker_localz checkpoint saving (object store)c              
   C  s   t j| j|}z0| jj||dd |dkr| | td|  t	|
dd
dd}| jj|d W d
S  tyS } ztd	|  W Y d
}~d
S d
}~ww )a_  Worker to upload checkpoint to object store, spawned with a child thread (in parallel with the training).

        Args:
            state_dict (dict[str, torch.Tensor]): The state dict of the model/optimizer/scheduler.
            checkpoint_file (str): The file name of the model checkpoint.
            rank (int): GPU device (default: 0).
        rd   keytyper   z!Saved checkpoint (object store): rA   r`   rC   ra   z,Checkpoint failed to upload (object store): N)rb   r'   rS   r(   r2   save_objectre   r
   rf   r   rg   r#   rh   ri   rj   rk   r   r   r   rV      s   
z&Checkpointer._save_worker_object_storecheckpoint loadingNtorch.optim.Optimizer | None+torch.optim.lr_scheduler.LRScheduler | Nonetorch.amp.GradScaler | Nonec                 C  s  | j | |  }|dur#| jr| jn| j}tj||}d}d}	n| j	r0| j	}| j
}| j}	nd}d}d}	|dur| | | jr[td|  | jj|dd}
td|  ntd|  tj|d	d
 dd}
td|  | j j||
d td |j|
d | jd |s|	r|
d }|sJ td ||
d  ||_nd}|r|sJ td ||
d  td ||
d  td| d ntd nd}td tj  | j j|||d |S )S  Load network weights and optimizer states from a checkpoint in a single process.

        The priority of the checkpoint loading logic is:
        1. Attempt to resume training if possible by looking for latest_checkpoint.txt under the same name.
        2. If no latest checkpoint were found, it loads the model weights specified by config_checkpoint.path.
           - This is typically used for inference mode.
           - If config_checkpoint.load_optimizer_state is True, then also load the optimizer and scheduler states.
        3. If none of the above, randomly initialize the model parameters and train from scratch.

        Args:
            model (ImaginaireModel): The PyTorch model.
            optimizer (torch.optim.Optimizer | None): The model optimizer (default: None).
            scheduler (torch.optim.lr_scheduler.LRScheduler | None): The optimization scheduler (default: None).
            grad_scaler (torch.amp.GradScaler | None): The gradient scaler (for mixed precision training).

        Returns:
            iteration (int): the iteration number to start/resume from.
        NTF#Loading checkpoint (object store): rd   rn   ,Complete loading checkpoint (object store): Loading checkpoint (local): c                 S     | S r   r   storagelocr   r   r   <lambda>       z#Checkpointer.load.<locals>.<lambda>)map_locationweights_only%Complete loading checkpoint (local): rH   - Loading the model...r6   strictr=   - Loading the scheduler...r9   r   - Loading the optimizer...r7    - Loading the gradient scaler...r;   ,Done with loading the checkpoint (iteration ).!Done with loading the checkpoint.Training from scratch.)r=   rl   )r#   on_load_checkpoint_start_read_latest_checkpoint_filer+   r(   r&   rb   r'   rS   r-   r.   r/   _check_checkpoint_existsr
   infor3   load_objectrf   rd   loadon_load_checkpointload_state_dictr,   
last_epochcudaempty_cacheon_load_checkpoint_end)r4   r6   r7   r9   r;   latest_checkpoint_filecheckpoint_dirrl   resumeZonly_resume_schedulerrI   r=   r   r   r   r      s^   






zCheckpointer.load
str | Nonec                 C  sp   d}| j r tj| jd}| jj|dr| jj|dd }|S tj| j	d}tj
|r6t|  }|S )zGet the file name of the latest saved checkpoint. If it doesn't exist, return None.

        Returns:
            checkpoint_file (str | None): file name of the latest saved checkpoint.
        Nlatest_checkpoint.txtro   textrn   )r+   rb   r'   rS   r(   r3   object_existsr   stripr&   isfileopenread)r4   rZ   latest_pathr   r   r   r      s   z)Checkpointer._read_latest_checkpoint_filec                 C  s   | d}| j rtj| jd}| jj||dd dS tj| jd}t|d}|	| W d   dS 1 s9w   Y  dS )zTrack the file name of the latest saved checkpoint.

        Args:
            checkpoint_file (str): file name of the latest saved checkpoint.
        
r   r   rn   wN)
r)   rb   r'   rS   r(   r2   rq   r&   r   write)r4   rZ   contentr   filer   r   r   re   
  s   
"z*Checkpointer._write_latest_checkpoint_filerl   c                 C  sD   | j r| jj|dstd| dS tj|s td| dS )zIf the file checkpoint_path does not exist, raise an error.

        Args:
            checkpoint_path (str): full path to the checkpoint.
        r   zFile not found (object store): zFile not found (local): N)r+   r3   r   FileNotFoundErrorrb   r'   exists)r4   rl   r   r   r   r     s   z%Checkpointer._check_checkpoint_existsc                 C  s   | j r
| j   dS dS )zFinalize the checkpointer.N)r0   rS   )r4   r   r   r   finalize&  s   zCheckpointer.finalize)r!   r   r"   r   r#   r$   r6   r   r7   r8   r9   r:   r;   r<   r=   r   r>   r?   )r   )rI   r\   rZ   r]   r^   r   r>   r?   NNN
r6   r   r7   rs   r9   rt   r;   ru   r>   r   )r>   r   )rZ   r]   r>   r?   )rl   r]   r>   r?   )r>   r?   )__name__
__module____qualname____doc__r5   r[   r   timerrW   rV   r   r   re   r   r   r   r   r   r   r    +   s$    

.
Y

r    c                   @  s   e Zd ZdS )_IncompatibleKeysN)r   r   r   r   r   r   r   r   ,  s    
r   IncompatibleKeysmissing_keysunexpected_keysincorrect_shapesc                   @  s2   e Zd ZdddZed			ddddZdS )MultiRankCheckpointerr6   r   r7   r8   r9   r:   r;   r<   r=   r   r>   r?   c                 C  s   |  \}}}d|d| d}	tt|}
|
D ]J}t |krbt| | | | |d}tj|dd}| j	j
||d | jrH| j  tj| jrP| jn| jd||	t fd	| _| j  qd
S )r@   rA   rB   rC   rD   rE   rF   rH   FrJ   N)get_ckpt_postfixlistranger	   rO   rP   rI   r   rQ   r#   rR   r0   rS   rT   rU   r)   rV   rW   rX   )r4   r6   r7   r9   r;   r=   postfix_total_ema_numrZ   Z
save_ranks_rankrI   r   r   r   r[   :  s0   

zMultiRankCheckpointer.saverr   Nrs   rt   ru   c                 C  s  |   }|dur+| \}}}|d| d}| jr| jn| j}	tj|	|}
d}n| j	rE| j	}
| \}}}|
d| d}
| j
}nd}
d}|
dur| |
 | jrntd|
  | jj|
dd}td|
  ntd	|
  tj|
d
d d}td|
  | jj||d td t|j|d | jd |r|d }|r|sJ td ||d  td ||d  ||_td ||d  td| d nd}td nd}td tj  |S )rv   NrC   TFrw   rd   rn   rx   ry   c                 S  rz   r   r   r{   r   r   r   r~     r   z,MultiRankCheckpointer.load.<locals>.<lambda>)r   r   rH   r   r6   r   r=   r   r7   r   r9   r   r;   r   r   r   r   r   )r   r   rg   r+   r(   r&   rb   r'   rS   r-   r.   r   r
   r   r3   r   rf   rd   r   r#   r   criticalr   r,   r   r   r   )r4   r6   r7   r9   r;   r   r   r   r   r   rl   r   rI   r=   r   r   r   r   e  sX   






zMultiRankCheckpointer.loadr   r   r   )r   r   r   r[   r   r   r   r   r   r   r   r   9  s    
+r   r6   torch.nn.Modulecheckpoint_state_dictrP   r>   c              
   C  sV  |   }g }t| D ]}||v rd|v r td| d q|| }tdkr1t|tjj	j
r1qt|tjsKtd| dt| dt||  dt|j}t|| j}||krtdkohttd	ohttd
}|rddd}	ttf}
|	| |}t||
rq||||f || q| j|dd}dd |jD }dd |jD }t|||dS )N_extra_statezSkipping key z; introduced by TransformerEngine for FP8 in the checkpoint.r   zFind non-tensor parameter z in the model. type:  z2, please check if this key is safe to skip or not.r   r   r6   r   ro   r]   r>   c                 S  s.   | dd d }| }|D ]}t||}q|S )Nr   )splitgetattr)r6   ro   	key_parts
cur_moduleZkey_partr   r   r   _get_module_for_key  s
   z2non_strict_load_model.<locals>._get_module_for_keyFr   c                 S     g | ]}d |vr|qS r   r   r   kr   r   r   
<listcomp>      z)non_strict_load_model.<locals>.<listcomp>c                 S  r   r   r   r   r   r   r   r     r   )r   r   r   )r6   r   ro   r]   r>   r   )rI   r   keysr
   warningr   
isinstancerd   nn	parameterUninitializedParameterTensor
ValueErrorrp   tupleshapehasattrr   r   r   appendpopr   r   r   r   )r6   r   model_state_dictr   r   model_paramshape_modelZshape_checkpointZhas_observer_base_classesr   Zcls_to_skipZtarget_moduleZincompatibler   r   r   r   r   non_strict_load_model  sR   "

	


r   )r6   r   r   rP   r>   r   )'
__future__r   rb   rT   typingr   r   r   r   rd   %cosmos_predict2._src.imaginaire.modelr   %cosmos_predict2._src.imaginaire.utilsr   r	   r
   r   r   &cosmos_predict2._src.imaginaire.configr   r   r   __version__r   r   __annotations__Ztorch.aor   torch.ao.quantizationr   r   r   Ztorch.quantizationr    r]   r   r   r   r   r   r   r   r   <module>   sF   *

  


 