o
    ?߱i`                  
   @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddlm
Z
 ddlmZmZmZmZmZmZmZ ddlZddlZddlm  mZ ddlmZ ddlmZmZ ddlmZ dd	lmZm Z m!Z!m"Z"m#Z# dd
l$m%Z% ddl&m'Z' ddl(m)Z)m*Z* ddl+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1mZm2Z2m3Z3 ddl4m5Z5 z=ddlm6Z7 ddlm8Z8m9Z9m:Z:m;Z;m<Z< ddl=m>Z>m?Z? d3de@eAef de>deBde9fddZCG dd de7Z6e2Dd W n! eEy ZF zddlm6Z6 e2DeF d W Y dZF[FndZF[Fww e	ddd gZGG d!d" d"e%ZHG d#d$ d$e%ZIG d%d& d&eAejJZKG d'd( d(ZLG d)d* d*ZMd+ejNd,ejNd-e,d.e-ddf
d/d0ZOG d1d2 d2e'ZPdS )4uu  
Distributed checkpoint (DCP) directory structure and storage backends.

The checkpointer saves model state in a sharded format across multiple processes:

self.save_dirname/
├── iter_000000005/                    # Checkpoint at iteration 5
│   ├── model/                         # Model state shards
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   ├── optim/                        # Optimizer state shards
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   ├── scheduler/                    # Learning rate scheduler state
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   └── trainer/                      # Additional training state
│       ├── __0_0.distcp              # Shard 0 from rank 0
│       └── __1_0.distcp              # Shard 1 from rank 1
└── latest_checkpoint.txt             # Points to most recent checkpoint folder, e.g. iter_000000005

Storage backends:
- Local filesystem:
  self.save_dirname = "{config_job.path_local}/checkpoints"

- S3 object store:
  self.save_dirname = "s3://{bucket}/{config_job.path}/checkpoints"
  where bucket = self.config_checkpoint.save_to_object_store.bucket

The sharded format enables efficient distributed saving/loading by:
1. Parallelizing I/O across processes
2. Reducing memory usage per process
3. Supporting both local and cloud storage backends
    N)
namedtuple)get_context)AnyDictListOptionalSetTupleUnion)nn)FileSystemReaderFileSystemWriter)DefaultSavePlanner)StateDictOptionsget_model_state_dictget_optimizer_state_dictset_model_state_dictset_optimizer_state_dict)Stateful)AbstractCheckpointer)S3StorageReaderS3StorageWriter)CheckpointConfig	JobConfig)ImaginaireModel)callbackdistributedlogmisc)easy_io)DefaultLoadPlanner)DTensorLoadPlan_create_read_items_versionflatten_state_dict)MetadataTensorStorageMetadataT
state_dictmetadatastrictreturnc              	   C   s   g }	 |   D ]t\}}|drq||jvr!|r td| dq|j| }t|tr^t|dd d ur^|j| kr^|sNt	d|j d|  d|  qt
d|j d|  d| t|trs|j d urr|t|||7 }q|t|||7 }qt|S )Nz._extra_statez&Missing key in checkpoint state_dict: .sizezSize mismatch between saved z and current: z for )itemsendswithstate_dict_metadataRuntimeError
isinstancer'   getattrr-   r   critical
ValueErrorr!   device_meshget_coordinater#   r"   )r(   r)   r*   requestsfqnobjmd r<   [/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/interactive/checkpointer/dcp.pycreate_default_local_load_plan_   s4   



"
r>   c                   @   s   e Zd ZdefddZdS )r    r+   c                 C   s   | j d usJ | jr=t| j }t| j j }|| }|r=dt_t| j\}}t| }||@ r:||| _| _	d t_t
| j| j | j S )N2_3)r)   r%   setr(   keysr0   r$   _derived_versionoriginal_state_dictmappingsr>   allow_partial_load)selfcurrent_keys	load_keysmissing_keysold_state_dictold_mappingsold_keysr<   r<   r=   create_local_plan   s"   z$DefaultLoadPlanner.create_local_planN)__name__
__module____qualname__r"   rM   r<   r<   r<   r=   r       s    r    zIfor the back comptiable pytorch! New DefaultLoadPlanner class is created.z, using default plannerStateDictItemPath	save_pathc                   @   sd   e Zd ZdZddeejeej f defddZ	de
eef fdd	Zd
e
eef ddfddZdS )ModelWrapperz%Wrapper for model state dict handlingFmodelload_ema_to_regc                 C   s   t |tjr	|gn|| _|| _| jrGddlm} ddlm} ddl	m
} ddlm} t ||sIt ||sKt ||sMt ||sOJ dt| d S d S d S d S d S )Nr   )Cosmos2InteractiveModel)DiffusionModel)Text2WorldModelRectifiedFlow)WANDiffusionModelzPModelWrapper only supports DiffusionModel when load_ema_to_reg is True, but got )r2   r   ModulerT   rU   ZBcosmos_predict2._src.interactive.methods.cosmos2_interactive_modelrV   Z5cosmos_predict2._src.predict2.models.text2world_modelrW   ZDcosmos_predict2._src.predict2.models.text2world_model_rectified_flowrX   Z=cosmos_predict2._src.predict2.models.text2world_wan2pt1_modelrY   type)rF   rT   rU   Zcosmos2_interactive_modelZpredict2_DiffusionModelZ&predict2_DiffusionModel_rectified_flowZwan2pt1_DiffusionModelr<   r<   r=   __init__   s&   zModelWrapper.__init__r+   c                 C   s   dd t t| jD }| jr=| jd jjjrJ dt| }t	dd |D s-J d|D ]}|
|||dd	< q/t| jd jd
r|| jd jjr|	 i | _g }| D ]}d|v rk|| || j|dd< qW|D ]}|
|||dd< qn|S )Nc                 S   $   i | ]}|  D ]\}}||qqS r<   r.   .0sdkvr<   r<   r=   
<dictcomp>      $ z+ModelWrapper.state_dict.<locals>.<dictcomp>r   AEMA is enabled, can not load EMA weights to regular model weightsc                 s   .    | ]}| d p| dp| dV  qdS )net.net_fake_score.net_discriminator_head.N
startswithr`   rb   r<   r<   r=   	<genexpr>   
    
z*ModelWrapper.state_dict.<locals>.<genexpr>zAll keys must start with net.rh   net_ema.use_lorazbase_layer. )mapr   rT   rU   configemaenabledlistrA   allpopreplacehasattrrq   checkpoint_to_model_keyappend)rF   _state_dictall_keysrb   Zkeys_to_updater<   r<   r=   r(      s2   
 
zModelWrapper.state_dictr(   Nc                 C   s"  | j r|t| jd jdr.| jd jjr.t| dr*| j D ]\}}||||< qntd| jd jj	j
r:J dt| }tdd |D sMJ d|D ],}|d	rb||||d	d
< qO|dro||||< qO|dr{||||< qOtjt|tddd}tt|| j d S )Nr   rq   r|   z2checkpoint_to_model_key is not set by `state_dict`rf   c                 s   rg   )rp   ri   rj   Nrk   rm   r<   r<   r=   rn     ro   z/ModelWrapper.load_state_dict.<locals>.<genexpr>z!All keys must start with net_ema.rp   rh   ri   rj   F)r*   )model_state_dictoptions)rU   r{   rT   rt   rq   r|   r.   ry   r5   ru   rv   rw   rA   rx   rl   rz   	functoolspartialr   r   rs   )rF   r(   Zcheckpoint_key	model_keyr   rb   funcr<   r<   r=   load_state_dict   s<    




zModelWrapper.load_state_dict)F)rN   rO   rP   __doc__r
   r   rZ   r   boolr\   r   strr   r(   r   r<   r<   r<   r=   rS      s
    $"rS   c                   @   sv   e Zd Zdeejeej f deejj	eejj	 f ddfddZ
deeef fddZd	eeef ddfd
dZdS )OptimizerWrapperrT   optimr+   Nc                 C   s<   t |tjr	|gn|| _t |tjjr|g| _d S || _d S N)r2   r   rZ   rT   torchr   	Optimizer)rF   rT   r   r<   r<   r=   r\      s   $zOptimizerWrapper.__init__c                 C   s.   t jttddd}dd t|| j| jD S )NTZflatten_optimizer_state_dict)r   c                 S   r]   r<   r^   r_   r<   r<   r=   rd   -  re   z/OptimizerWrapper.state_dict.<locals>.<dictcomp>)r   r   r   r   rs   rT   r   )rF   r   r<   r<   r=   r(   (  s
   zOptimizerWrapper.state_dictr(   c                 C   s.   t jt|tddd}tt|| j| j d S )NTr   )optim_state_dictr   )r   r   r   r   rw   rs   rT   r   )rF   r(   r   r<   r<   r=   r   /  s   z OptimizerWrapper.load_state_dict)rN   rO   rP   r
   r   rZ   r   r   r   r   r\   r   r   r   r(   r   r<   r<   r<   r=   r     s    
r   c                   @   s   e Zd ZdZdZdS )	AsyncModedisabledZasync_with_pinned_memN)rN   rO   rP   DISABLEDASYNC_WITH_PINNED_MEMr<   r<   r<   r=   r   8  s    r   c                   @   s   e Zd ZdS )	TerminateN)rN   rO   rP   r<   r<   r<   r=   r   =  s    r   c                   @   s*   e Zd ZdededefddZdd ZdS )	SaveDone	iterationelapsed_time	succeededc                 C   s   || _ || _|| _d S r   r   r   r   )rF   r   r   r   r<   r<   r=   r\   B  s   
zSaveDone.__init__c                 C   s   d| j  d| j d| j dS )NzSaveDone(iteration=z, elapsed_time=z, succeeded=)r   rF   r<   r<   r=   __str__G  s   zSaveDone.__str__N)rN   rO   rP   intfloatr   r\   r   r<   r<   r<   r=   r   A  s    r   receiver_queuesender_queuecheckpoint_config
job_configc                 C   s  t ttjd d tjd< dtjd< tjttjd  t  t	||ddd}z	 t
d	 |  }t
d
 t|tr[t
d |t  |  W t
d tj  dS t|tsdJ d|\}}t }|d d d }	d}
d}zOz||| t | }
t
d|
dd|	  d}W n ty } zt
d| d|  W Y d}~nd}~ww W |
dkrt | }
|t|	|
| n|
dkrt | }
|t|	|
| w q,t
d tj  w )a~  
    Handles model checkpoint saving in a separate background process using PyTorch's distributed functionality.
    This function runs in a dedicated process to avoid blocking the main training loop.

    Args:
        receiver_queue: Queue to receive state dictionaries and commands from the main process
        sender_queue: Queue to send completion signals back to the main process
        checkpoint_config: Configuration settings for checkpoint saving behavior
        job_config: Configuration settings for the training job

    Flow:
        1. Initializes distributed processing environment
        2. Continuously waits for state dictionaries to save
        3. Saves checkpoints asynchronously
        4. Signals completion back to main process
        5. Terminates when receiving a Terminate signal

    Raises:
        AssertionError: If received object is neither Terminate signal nor valid state dict tuple

    Note:
        - Uses a different port than the main process to avoid conflicts
        - Disables TorchElastic agent store for checkpoint operations
        - Automatically cleans up distributed process group on exit
    MASTER_PORT   FalseTORCHELASTIC_USE_AGENT_STORE
LOCAL_RANKNT)disable_asynczPCheckpoint background process is ready for next task, waiting for new state_dictzReceived new state_dictzRReceived termination signal in checkpoint background process, closing sender queuez1Cleaning up: destroying distributed process groupz>Received data must be a tuple of (state_dict, checkpoint_path)trainerr   r   FzACheckpoint saved successfully in background process. Time taken: z.2fz seconds, iteration: zError saving checkpoint to z: )r   r   osenvironr   cuda
set_devicer   initDistributedCheckpointerr   debuggetr2   r   infoputclosedestroy_process_grouptupletime	monotonicsave_state_dict_worker	Exceptionerrorr   )r   r   r   r   Zcheckpoint_handlerZreceived_datar(   checkpoint_path
start_timer   r   r   er<   r<   r=   save_checkpoint_in_backgroundK  sX    





"
#r   c                       s  e Zd Zg dZ		d+dededeej de	f fdd	Z
d
eeeedf f fddZed			d,dedejjdB dejjjdB dejjdB d
ef
ddZdedeeeeef f d
dfddZd-ddZd.ded
dfddZ ded
ee!e"f fdd Z#ded
ee$e%f fd!d"Z&d#eeeeef f ded
dfd$d%Z'dedejjdejjjdejjd&ed
dfd'd(Z(d- fd)d*Z)  Z*S )/r   rT   r   	schedulerr   NFconfig_checkpoint
config_job	callbacksr   c                    s   t  ||| || _|jrtj| _ntj| _|rtj| _| jtjkrVtd}|	 | _
|	 | _|jt| j
| j||fdd| _| j  d | _d| _d | _tj | _d S d S )NspawnT)targetargsdaemonF)superr\   r   dcp_async_mode_enabledr   r   
async_moder   r   Queuemp_queue_sendmp_queue_recvProcessr   mpstartcpu_offload_state_dictstagingstaging_ckpt_filer   r   Streamstaging_stream)rF   r   r   r   r   ctx	__class__r<   r=   r\     s4   




z DistributedCheckpointer.__init__r+   c                    sR     }g }|d urtj j|}| j nd jr{ j} jrcd j	j
j d| }td|sc|}tj|d}tj| jdrVtj| jd }| d| }ntd| d|  |} jrm| j n|d	  jrz|d
 nd }t jdkr jD ]}| jv sJ d| d j q fdd|D }t||fS )Nzs3:///z/checkpoints/iter_\d{9}/?$z!checkpoints/latest_checkpoint.txt)backend_keyz/checkpoints/zLatest checkpoint file z not found, load from rT   r   r   zInvalid key to resume: z not in c                    s   g | ]	}| j vr|qS r<   )keys_not_to_resume)r`   keyr   r<   r=   
<listcomp>  s    zFDistributedCheckpointer.keys_to_resume_during_load.<locals>.<listcomp>)_read_latest_checkpoint_filer   pathjoinZload_dirnameextendKEYS_TO_SAVE	load_pathZload_s3_backend_keyr   load_from_object_storebucketresearchr   existsloadstripr   warningload_training_stater}   only_load_scheduler_statelenr   r@   )rF   latest_checkpoint_fileresume_keysr   Zold_ckpt_pathZlatest_ckpt_pathcheckpoint_filer   r<   r   r=   keys_to_resume_during_load  sF   


"z2DistributedCheckpointer.keys_to_resume_during_loadzcheckpoint loadingrT   	optimizerr   grad_scalerc                 C   s"  | j d ur| j | |  \}}t|}td| d|  d}|d ur| | |D ]}tdd}	tj	
||}
td|  | |
}tj  tjd|
 dd	 |d
krvtd t|}| }tj|||	d || q-|dkrtd t||}| }tj|||	d || q-|dkrtd | }tj|||	d || q-|dkrtd | |d}tj|||	d ||d  |d }q-td| d| j d ur| j j||d td| d|  ntd tj  | j d ur| j j|||d |S )NzResuming ckpt z with keys: r   T)rE   zStart loading checkpoint from z	starting F
rank0_onlyrT   z- Loading the model...)storage_readerplannerr   z- Loading the optimizer...r   z- Loading the scheduler...r   z- Loading the trainer...r   r   r   r   zInvalid key: z. not support to resume.r(   zLoaded checkpoint from z in iteration zTraining from scratch.)r   r   )r   on_load_checkpoint_startr   sortedr   r4   _check_checkpoint_existsr    r   r   r   get_storage_readerr   r   barrierr   rS   r(   dcpr   r   r   r5   on_load_checkpointr   empty_cacheon_load_checkpoint_end)rF   rT   r   r   r   r   r   r   r   Zload_plannercur_key_ckpt_full_pathr   _model_wrapperr~   Z_optim_wrapperr<   r<   r=   r     s   













zDistributedCheckpointer.loadr   r(   c              
   C   s   z
ddl m}m} W n ty } ztd|d }~ww | jd u r4tdt d ||ddd| _tdt d t	j
| j ||| jdd	| _d| _|| _W d    n1 s`w   Y  |   d S )
Nr   )_copy_state_dict_create_cpu_state_dictzXPlease install the latest PyTorch nightly to use async checkpointing with pinned memory.z+Preparing the CPU memory, time.monotonic()=z.:.2fT)
pin_memoryshare_memoryz)Staging the state_dict, time.monotonic()=)non_blocking)#torch.distributed._state_dict_utilsr  r	  ImportErrorr   r   r   r   r   r   r   streamr   r   r   maybe_wait_for_staging)rF   r   r(   r  r	  r   r<   r<   r=   _async_with_pinned_memoryB  s0   
	z1DistributedCheckpointer._async_with_pinned_memoryc                    sJ    j tjkr! jr# j s j   fdd}|  d _d S d S d S )Nc                      s    j  j jf d S r   )r   
put_nowaitr   r   r<   r   r<   r=   	sync_func^  s   zADistributedCheckpointer.maybe_wait_for_staging.<locals>.sync_funcF)r   r   r   r   r   querysynchronize)rF   r  r<   r   r=   r  Y  s   


z.DistributedCheckpointer.maybe_wait_for_stagingr   wait_forc              	   C   s  | j tjkrzit }| j r|dkrmz4| jjdd}t|t	r*t
d W W dS |}t
jd|  | jdurG|jrG| jj|j|jd W n tjy`   t | }||kr^Y W dS Y nw | j r|dksW dS W dS  ttfy   t
d Y dS w dS )	zfGet the results of previously submitted checkpoints and pass them to callbacks if checkpoint succeededr      )timeoutz=Received termination event from checkpoint background processz!Received checkpoint save result: Nr   r   z1Queue was closed by checkpoint background process)r   r   r   r   r   r   emptyr   r2   r   r   r   loggerr   r   on_save_checkpoint_successr   r   queueEmptyEOFErrorBrokenPipeError)rF   r  r   retZ	save_doner   r<   r<   r=   get_previous_checkpoint_resultsd  s6   

z7DistributedCheckpointer.get_previous_checkpoint_resultsr   c                 C   s"   | j rt| jj j|dS t|dS )Ncredential_pathr   )r   )save_to_object_storer   r   credentialsr   rF   r   r<   r<   r=   get_storage_writer|  s   
z*DistributedCheckpointer.get_storage_writerc                 C   s    | j rt| jj j|dS t|S )Nr#  )r   r   r   r&  r   r'  r<   r<   r=   r     s   z*DistributedCheckpointer.get_storage_readerto_save_dictc                 C   sz   |  D ]\}\}}| |}tj||tddd qt r+td|  | | t	j
dtj| j| dd d S )NT)dedup_save_to_lowest_rank)storage_writerr   zSaving last checkpoint file zSaved checkpoint to r   )r.   r(  r  saver   r   is_rank0print_write_latest_checkpoint_filer   r4   r   r   r   save_dirname)rF   r)  r   rb   rc   Zfull_checkpoint_pathr+  r<   r<   r=   r     s   

$z.DistributedCheckpointer.save_state_dict_workerr   c              
   C   s^  | j tjkr| jdd | jdur| j|| d|d}t| t|| | | |dd}|	 D ]}t
j| jd|dd| }	|| |	f||< q9| jdur`| jj||d	 | j tjkrm| || n1t }
z| || W | jdur| jj|t |
 d
 n| jdur| jj|t |
 d
 w w | jdur| jjd|d dS dS )a  Save network weights, optimizer parameters, scheduler parameters to a checkpoint.

        Args:
            model (ImaginaireModel): The PyTorch model.
            optimizer (torch.optim.Optimizer): The model optimizer.
            scheduler (torch.optim.lr_scheduler.LRScheduler): The optimization scheduler.
            grad_scaler (torch.amp.GradScaler): The gradient scaler (for mixed precision training).
            iteration (int): Current iteration number.
        r   r  Niter_09r   r   r   r   r  )rT   r   )r   r   r   r"  r   on_save_checkpoint_startrS   r(   r   rA   r   r   r   r0  on_save_checkpointr  r   r   r   r  on_save_checkpoint_end)rF   rT   r   r   r   r   r   r)  rb   Zoutput_dirnamer   r<   r<   r=   r,    sF   

	



zDistributedCheckpointer.savec                    sZ   t    | jtjkr'| jr)| j r+| jt	  | j
dd | j  d S d S d S d S )N<   r1  )r   finalizer   r   r   r   is_aliver   r   r   r"  r   r   r   r<   r=   r8    s   
z DistributedCheckpointer.finalize)NF)NNN)r+   N)r   )+rN   rO   rP   r   r   r   r   r   CallBackGroupr   r\   r	   r   r
   r   r   r   timerr   r   r   r   lr_schedulerLRScheduleramp
GradScalerr   r   r   r   r  r  r"  r   r   r(  r   r   r   r   r,  r8  __classcell__r<   r<   r   r=   r     sb    %,

&P
&
9r   )T)Qr   enumr   multiprocessingr   r  r   r   collectionsr   r   typingr   r   r   r   r   r	   r
   r   torch.distributedtorch.distributed.checkpointr   
checkpointr  r   r   r   ,torch.distributed.checkpoint.default_plannerr   'torch.distributed.checkpoint.state_dictr   r   r   r   r   %torch.distributed.checkpoint.statefulr   Z1cosmos_predict2._src.imaginaire.checkpointer.baser   :cosmos_predict2._src.imaginaire.checkpointer.s3_filesystemr   r   &cosmos_predict2._src.imaginaire.configr   r   %cosmos_predict2._src.imaginaire.modelr   %cosmos_predict2._src.imaginaire.utilsr   r   r   -cosmos_predict2._src.imaginaire.utils.easy_ior   r    Z_DefaultLoadPlannerr!   r"   r#   r$   r%   %torch.distributed.checkpoint.metadatar&   r'   dictr   r   r>   r4   r  r   rQ   rS   r   Enumr   r   r   r   r   r   r<   r<   r<   r=   <module>   sn   #$$.(\

R