o
    vi(                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddlm
Z
 ddlmZmZmZmZmZmZmZ ddlZddlZddlm  mZ ddlmZ ddlmZmZ ddlmZ dd	lmZm Z m!Z!m"Z"m#Z# dd
l$m%Z% ddl&m'Z' ddl(m)Z)m*Z* ddl+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1mZm2Z2m3Z3 ddl4m5Z5 z@ddlm6Z7 ddlm8Z8m9Z9m:Z:m;Z;m<Z< ddl=m>Z>m?Z? 	d7de@eAef de>deBdeBde9f
ddZCG dd de7Z6e2Dd W n! eEy ZF zddlm6Z6 e2DeF d  W Y dZF[FndZF[Fww e	d!dd"gZGd#d$ ZHG d%d& d&e%ZIG d'd( d(e%ZJG d)d* d*eAejKZLG d+d, d,ZMG d-d. d.ZNd/ejOd0ejOd1e,d2e-ddf
d3d4ZPG d5d6 d6e'ZQdS )8uu  
Distributed checkpoint (DCP) directory structure and storage backends.

The checkpointer saves model state in a sharded format across multiple processes:

self.save_dirname/
├── iter_000000005/                    # Checkpoint at iteration 5
│   ├── model/                         # Model state shards
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   ├── optim/                        # Optimizer state shards
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   ├── scheduler/                    # Learning rate scheduler state
│   │   ├── __0_0.distcp              # Shard 0 from rank 0
│   │   └── __1_0.distcp              # Shard 1 from rank 1
│   └── trainer/                      # Additional training state
│       ├── __0_0.distcp              # Shard 0 from rank 0
│       └── __1_0.distcp              # Shard 1 from rank 1
└── latest_checkpoint.txt             # Points to most recent checkpoint folder, e.g. iter_000000005

Storage backends:
- Local filesystem:
  self.save_dirname = "{config_job.path_local}/checkpoints"

- S3 object store:
  self.save_dirname = "s3://{bucket}/{config_job.path}/checkpoints"
  where bucket = self.config_checkpoint.save_to_object_store.bucket

The sharded format enables efficient distributed saving/loading by:
1. Parallelizing I/O across processes
2. Reducing memory usage per process
3. Supporting both local and cloud storage backends
    N)
namedtuple)get_context)AnyDictListOptionalSetTupleUnion)nn)FileSystemReaderFileSystemWriter)DefaultSavePlanner)StateDictOptionsget_model_state_dictget_optimizer_state_dictset_model_state_dictset_optimizer_state_dict)Stateful)AbstractCheckpointer)S3StorageReaderS3StorageWriter)CheckpointConfig	JobConfig)ImaginaireModel)callbackdistributedlogmisc)easy_io)DefaultLoadPlanner)DTensorLoadPlan_create_read_items_versionflatten_state_dict)MetadataTensorStorageMetadataTF
state_dictmetadatastrictdcp_allow_mismatched_sizereturnc              	   C   s  g }	 |   D ]v\}}|drq||jvr!|r td| dq|j| }|s`t|tr`t|dd d ur`|j| kr`|sPt	d|j d|  d|  qt
d|j d|  d| t|tru|j d urt|t|||7 }q|t|||7 }qt|S )Nz._extra_statez&Missing key in checkpoint state_dict: .sizezSize mismatch between saved z and current: z for )itemsendswithstate_dict_metadataRuntimeError
isinstancer'   getattrr.   r   critical
ValueErrorr!   device_meshget_coordinater#   r"   )r(   r)   r*   r+   requestsfqnobjmd r=   R/data/cameron/vidgen/cosmos-policy/cosmos_policy/_src/predict2/checkpointer/dcp.pycreate_default_local_load_plan_   s6   



"
r?   c                   @   s(   e Zd ZdefddZdefddZdS )r    r+   c                 C   s
   || _ d S N)r+   )selfr+   r=   r=   r>   set_partial_channel_weight   s   
z-DefaultLoadPlanner.set_partial_channel_weightr,   c                 C   s   | j d usJ | jr=t| j }t| j j }|| }|r=dt_t| j\}}t| }||@ r:||| _| _	d t_t
| j| j | j t| ddS )N2_3r+   F)r)   r%   setr(   keysr1   r$   _derived_versionoriginal_state_dictmappingsr?   allow_partial_loadr4   )rA   current_keys	load_keysmissing_keysold_state_dictold_mappingsold_keysr=   r=   r>   create_local_plan   s$   
z$DefaultLoadPlanner.create_local_planN)__name__
__module____qualname__boolrB   r"   rP   r=   r=   r=   r>   r       s    r    zIfor the back comptiable pytorch! New DefaultLoadPlanner class is created.z, using default plannerStateDictItemPath	save_pathc           	      C   s   t j| ||d g }t|dru|jd urwt|jj }t|  }t|| }t|| }|rMdd |D }dt	tdd |D }t
d|  |ryd	d |D }d
d |D }dt	tdd |D }t
d|  d S d S d S d S )Nstorage_readerplannerr)   c                 S       g | ]}d |v sd|vr|qS zblocks.0zblocks.r=   .0keyr=   r=   r>   
<listcomp>        z'dcp_load_state_dict.<locals>.<listcomp>
c                 s   (    | ]}d  |d dd V  qdS r-   N
   joinsplitr]   kr=   r=   r>   	<genexpr>      & z&dcp_load_state_dict.<locals>.<genexpr>z"Missing keys in pretrained model: c                 S   s   g | ]}d |vr|qS )_extra_stater=   r\   r=   r=   r>   r_      s    c                 S   rZ   r[   r=   r\   r=   r=   r>   r_      r`   c                 s   rb   rc   re   rh   r=   r=   r>   rj      rk   z%Unexpected keys in pretrained model: )dcploadhasattrr)   rD   r1   rE   listrf   sortedr   r5   )	_state_dictrX   load_plannerrL   checkpoint_keys
model_keysunexpected_keysZmissing_keys_strZunexpected_keys_strr=   r=   r>   dcp_load_state_dict   s,   rw   c                   @   st   e Zd ZdZddeejeej f defddZ	i fde
eef deeef fd	d
Zdeeef ddfddZdS )ModelWrapperz%Wrapper for model state dict handlingFmodelload_ema_to_regc                    s   t  tjr	 gn | _|| _| jrIg }ddlm} || ddlm	} || ddl
m} || t fdd|D sKJ dt  d S d S )Nr   )DiffusionModel)Text2WorldModelRectifiedFlow)WANDiffusionModelc                 3   s    | ]}t  |V  qd S r@   )r3   )r]   clsry   r=   r>   rj          z(ModelWrapper.__init__.<locals>.<genexpr>zPModelWrapper only supports DiffusionModel when load_ema_to_reg is True, but got )r3   r   Modulery   rz   Z3cosmos_policy._src.predict2.models.text2world_modelr{   appendZBcosmos_policy._src.predict2.models.text2world_model_rectified_flowr|   Z;cosmos_policy._src.predict2.models.text2world_wan2pt1_modelr}   anytype)rA   ry   rz   Zsupported_model_typesZpredict2_DiffusionModelZ&predict2_DiffusionModel_rectified_flowZwan2pt1_DiffusionModelr=   r   r>   __init__   s   


zModelWrapper.__init__mapping_keysr,   c           	      C   s&  dd t t| jD }| jr=| jd jjjrJ dt| }t	dd |D s-J d|D ]}|
|||dd	< q/t| jd jd
r| jd jjr	 i | _|ddd g }| D ]#}|}| D ]
\}}|||}qg||kr|||f || j|< q_|D ]\}}|
|||< q|S )Nc                 S   $   i | ]}|  D ]\}}||qqS r=   r/   r]   sdri   vr=   r=   r>   
<dictcomp>      $ z+ModelWrapper.state_dict.<locals>.<dictcomp>r   AEMA is enabled, can not load EMA weights to regular model weightsc                 s       | ]}| d V  qdS )net.N
startswithrh   r=   r=   r>   rj     r   z*ModelWrapper.state_dict.<locals>.<genexpr>zAll keys must start with net.r   net_ema.use_lora )zbase_layer.zbase_model.model.)mapr   ry   rz   configemaenabledrp   rE   allpopreplacero   r   checkpoint_to_model_keyupdater/   r   )	rA   r   rr   all_keysri   Zkeys_to_updatenew_keyZfrom_keyZto_keyr=   r=   r>   r(      s:    
zModelWrapper.state_dictr(   Nc                 C   s   t | jd jdr+| jd jjr+t | dr'| j D ]\}}||||< qntd| jr]| jd jj	j
r:J dt| }tdd |D sMJ d|D ]}||||d	d
< qOtjt|tddd}tt|| j d S )Nr   r   r   z2checkpoint_to_model_key is not set by `state_dict`r   c                 s   r   )r   Nr   rh   r=   r=   r>   rj   1  r   z/ModelWrapper.load_state_dict.<locals>.<genexpr>z!All keys must start with net_ema.r   r   F)r*   )model_state_dictoptions)ro   ry   r   r   r   r/   r   r6   rz   r   r   rp   rE   r   r   	functoolspartialr   r   r   )rA   r(   checkpoint_key	model_keyr   ri   funcr=   r=   r>   load_state_dict%  s(    
zModelWrapper.load_state_dict)F)rQ   rR   rS   __doc__r
   r   r   r   rT   r   dictstrr   r   r(   r   r=   r=   r=   r>   rx      s
    $&'rx   c                   @   sv   e Zd Zdeejeej f deejj	eejj	 f ddfddZ
deeef fddZd	eeef ddfd
dZdS )OptimizerWrapperry   optimr,   Nc                 C   s<   t |tjr	|gn|| _t |tjjr|g| _d S || _d S r@   )r3   r   r   ry   torchr   	Optimizer)rA   ry   r   r=   r=   r>   r   >  s   $zOptimizerWrapper.__init__c                 C   s.   t jttddd}dd t|| j| jD S )NTZflatten_optimizer_state_dict)r   c                 S   r   r=   r   r   r=   r=   r>   r   K  r   z/OptimizerWrapper.state_dict.<locals>.<dictcomp>)r   r   r   r   r   ry   r   )rA   r   r=   r=   r>   r(   F  s
   zOptimizerWrapper.state_dictr(   c                 C   s.   t jt|tddd}tt|| j| j d S )NTr   )optim_state_dictr   )r   r   r   r   rp   r   ry   r   )rA   r(   r   r=   r=   r>   r   M  s   z OptimizerWrapper.load_state_dict)rQ   rR   rS   r
   r   r   r   r   r   r   r   r   r   r   r(   r   r=   r=   r=   r>   r   =  s    
r   c                   @   s   e Zd ZdZdZdS )	AsyncModedisabledZasync_with_pinned_memN)rQ   rR   rS   DISABLEDASYNC_WITH_PINNED_MEMr=   r=   r=   r>   r   V  s    r   c                   @   s   e Zd ZdS )	TerminateN)rQ   rR   rS   r=   r=   r=   r>   r   [  s    r   c                   @   s*   e Zd ZdededefddZdd ZdS )	SaveDone	iterationelapsed_time	succeededc                 C   s   || _ || _|| _d S r@   r   r   r   )rA   r   r   r   r=   r=   r>   r   `  s   
zSaveDone.__init__c                 C   s   d| j  d| j d| j dS )NzSaveDone(iteration=z, elapsed_time=z, succeeded=)r   rA   r=   r=   r>   __str__e  s   zSaveDone.__str__N)rQ   rR   rS   intfloatrT   r   r   r=   r=   r=   r>   r   _  s    r   receiver_queuesender_queuecheckpoint_config
job_configc                 C   s  t ttjd d tjd< dtjd< tjttjd  t  t	||ddd}z	 t
d	 |  }t
d
 t|tr[t
d |t  |  W t
d tj  dS t|tsdJ d|\}}t }|d d d }	d}
d}zOz||| t | }
t
d|
dd|	  d}W n ty } zt
d| d|  W Y d}~nd}~ww W |
dkrt | }
|t|	|
| n|
dkrt | }
|t|	|
| w q,t
d tj  w )a~  
    Handles model checkpoint saving in a separate background process using PyTorch's distributed functionality.
    This function runs in a dedicated process to avoid blocking the main training loop.

    Args:
        receiver_queue: Queue to receive state dictionaries and commands from the main process
        sender_queue: Queue to send completion signals back to the main process
        checkpoint_config: Configuration settings for checkpoint saving behavior
        job_config: Configuration settings for the training job

    Flow:
        1. Initializes distributed processing environment
        2. Continuously waits for state dictionaries to save
        3. Saves checkpoints asynchronously
        4. Signals completion back to main process
        5. Terminates when receiving a Terminate signal

    Raises:
        AssertionError: If received object is neither Terminate signal nor valid state dict tuple

    Note:
        - Uses a different port than the main process to avoid conflicts
        - Disables TorchElastic agent store for checkpoint operations
        - Automatically cleans up distributed process group on exit
    MASTER_PORT   FalseTORCHELASTIC_USE_AGENT_STORE
LOCAL_RANKNT)disable_asynczPCheckpoint background process is ready for next task, waiting for new state_dictzReceived new state_dictzRReceived termination signal in checkpoint background process, closing sender queuez1Cleaning up: destroying distributed process groupz>Received data must be a tuple of (state_dict, checkpoint_path)trainerr   r   FzACheckpoint saved successfully in background process. Time taken: z.2fz seconds, iteration: zError saving checkpoint to z: )r   r   osenvironr   cuda
set_devicer   initDistributedCheckpointerr   debuggetr3   r   infoputclosedestroy_process_grouptupletime	monotonicsave_state_dict_worker	Exceptionerrorr   )r   r   r   r   Zcheckpoint_handlerZreceived_datar(   checkpoint_path
start_timer   r   r   er=   r=   r>   save_checkpoint_in_backgroundi  sX    





"
#r   c                       s  e Zd Zg dZ		d+dededeej de	f fdd	Z
d
eeeedf f fddZed			d,dedejjdB dejjjdB dejjdB d
ef
ddZdedeeeeef f d
dfddZd-ddZd.ded
dfddZ ded
ee!e"f fdd Z#ded
ee$e%f fd!d"Z&d#eeeeef f ded
dfd$d%Z'dedejjdejjjdejjd&ed
dfd'd(Z(d- fd)d*Z)  Z*S )/r   ry   r   	schedulerr   NFconfig_checkpoint
config_job	callbacksr   c                    s   t  ||| || _|jrtj| _ntj| _|rtj| _| jtjkrVtd}|	 | _
|	 | _|jt| j
| j||fdd| _| j  d | _d| _d | _tj | _d S d S )NspawnT)targetargsdaemonF)superr   r   dcp_async_mode_enabledr   r   
async_moder   r   Queuemp_queue_sendmp_queue_recvProcessr   mpstartcpu_offload_state_dictstagingstaging_ckpt_filer   r   Streamstaging_stream)rA   r   r   r   r   ctx	__class__r=   r>   r     s4   




z DistributedCheckpointer.__init__r,   c                    sb     }g }|d urtj j|}| j nl jrt j	ds j} j
rkd jjj d| }td|sk|}tj|d}tj| j
dr^tj| j
d }| d| }ntd| d	|  |} jru| j n|d
  jr|d nd }t jdkr jD ]}| jv sJ d| d j q fdd|D }t||fS )Nz.ptzs3:///z/checkpoints/iter_\d{9}/?$z!checkpoints/latest_checkpoint.txt)backend_keyz/checkpoints/zLatest checkpoint file z not found, load from ry   r   r   zInvalid key to resume: z not in c                    s   g | ]	}| j vr|qS r=   )keys_not_to_resumer\   r   r=   r>   r_     s    zFDistributedCheckpointer.keys_to_resume_during_load.<locals>.<listcomp>)_read_latest_checkpoint_filer   pathrf   Zload_dirnameextendKEYS_TO_SAVE	load_pathr   r0   Zload_s3_backend_keyr   load_from_object_storebucketresearchr   existsrn   stripr   warningload_training_stater   only_load_scheduler_statelenr   rD   )rA   latest_checkpoint_fileresume_keysr   Zold_ckpt_pathZlatest_ckpt_pathcheckpoint_filer^   r=   r   r>   keys_to_resume_during_load  sF   


"z2DistributedCheckpointer.keys_to_resume_during_loadzcheckpoint loadingry   	optimizerr   grad_scalerc                 C   sL  | j d ur| j | |  \}}t|}td| d|  d}|d ur| | |D ]}tdd}	t|	drKtd| j	j
  |	| j	j
 tj||}
td|  | |
}tj  tjd	|
 d
d |dkrtd t|}| }t|||	 || q.|dkrtd t||}| }tj|||	d || q.|dkrtd | }tj|||	d || q.|dkrtd | |d}tj|||	d ||d  |d }q.td| d| j d ur| j j||d td| d|  ntd tj  | j d ur$| j j|||d |S )NzResuming ckpt z with keys: r   T)rI   rB   zset_partial_channel_weight: zStart loading checkpoint from z	starting F
rank0_onlyry   z- Loading the model...r   z- Loading the optimizer...rW   r   z- Loading the scheduler...r   z- Loading the trainer...r  r   r  r   zInvalid key: z. not support to resume.r(   zLoaded checkpoint from z in iteration zTraining from scratch.)r   r   ) r   on_load_checkpoint_startr  rq   r   r5   _check_checkpoint_existsr    ro   r   r+   rB   r   r   rf   get_storage_readerr   r   barrierr   rx   r(   rw   r   r   rm   rn   r6   on_load_checkpointr   empty_cacheon_load_checkpoint_end)rA   ry   r  r   r  r  r   r   r^   rs   cur_key_ckpt_full_pathrX   _model_wrapperrr   Z_optim_wrapperr=   r=   r>   rn     s   















zDistributedCheckpointer.loadr  r(   c              
   C   s   z
ddl m}m} W n ty } ztd|d }~ww | jd u r4tdt d ||ddd| _tdt d t	j
| j ||| jdd	| _d| _|| _W d    n1 s`w   Y  |   d S )
Nr   )_copy_state_dict_create_cpu_state_dictzXPlease install the latest PyTorch nightly to use async checkpointing with pinned memory.z+Preparing the CPU memory, time.monotonic()=z.:.2fT)
pin_memoryshare_memoryz)Staging the state_dict, time.monotonic()=)non_blocking)#torch.distributed._state_dict_utilsr!  r"  ImportErrorr   r   r   r   r   r   r   streamr   r   r   maybe_wait_for_staging)rA   r  r(   r!  r"  r   r=   r=   r>   _async_with_pinned_memory`  s0   
	z1DistributedCheckpointer._async_with_pinned_memoryc                    sJ    j tjkr! jr# j s j   fdd}|  d _d S d S d S )Nc                      s    j  j jf d S r@   )r   
put_nowaitr   r   r=   r   r=   r>   	sync_func|  s   zADistributedCheckpointer.maybe_wait_for_staging.<locals>.sync_funcF)r   r   r   r   r   querysynchronize)rA   r,  r=   r   r>   r)  w  s   


z.DistributedCheckpointer.maybe_wait_for_stagingr   wait_forc              	   C   s  | j tjkrzit }| j r|dkrmz4| jjdd}t|t	r*t
d W W dS |}t
jd|  | jdurG|jrG| jj|j|jd W n tjy`   t | }||kr^Y W dS Y nw | j r|dksW dS W dS  ttfy   t
d Y dS w dS )	zfGet the results of previously submitted checkpoints and pass them to callbacks if checkpoint succeededr      )timeoutz=Received termination event from checkpoint background processz!Received checkpoint save result: Nr   r   z1Queue was closed by checkpoint background process)r   r   r   r   r   r   emptyr   r3   r   r   r   loggerr   r   on_save_checkpoint_successr   r   queueEmptyEOFErrorBrokenPipeError)rA   r/  r   retZ	save_doner   r=   r=   r>   get_previous_checkpoint_results  s6   

z7DistributedCheckpointer.get_previous_checkpoint_resultsr   c                 C   s"   | j rt| jj j|dS t|dS )NZcredential_pathr   )r   )save_to_object_storer   r   credentialsr   rA   r   r=   r=   r>   get_storage_writer  s   
z*DistributedCheckpointer.get_storage_writerc                 C   s    | j rt| jj j|dS t|S )Nr<  )r  r   r   r>  r   r?  r=   r=   r>   r    s   z*DistributedCheckpointer.get_storage_readerto_save_dictc                 C   sz   |  D ]\}\}}| |}tj||tddd qt r+td|  | | t	j
dtj| j| dd d S )NT)dedup_save_to_lowest_rank)storage_writerrY   zSaving last checkpoint file zSaved checkpoint to r  )r/   r@  rm   saver   r   is_rank0print_write_latest_checkpoint_filer   r5   r   r   rf   save_dirname)rA   rA  r  ri   r   Zfull_checkpoint_pathrC  r=   r=   r>   r     s   

$z.DistributedCheckpointer.save_state_dict_workerr   c              
   C   s^  | j tjkr| jdd | jdur| j|| d|d}t| t|| | | |dd}|	 D ]}t
j| jd|dd| }	|| |	f||< q9| jdur`| jj||d	 | j tjkrm| || n1t }
z| || W | jdur| jj|t |
 d
 n| jdur| jj|t |
 d
 w w | jdur| jjd|d dS dS )a  Save network weights, optimizer parameters, scheduler parameters to a checkpoint.

        Args:
            model (ImaginaireModel): The PyTorch model.
            optimizer (torch.optim.Optimizer): The model optimizer.
            scheduler (torch.optim.lr_scheduler.LRScheduler): The optimization scheduler.
            grad_scaler (torch.amp.GradScaler): The gradient scaler (for mixed precision training).
            iteration (int): Current iteration number.
        r   r/  Niter_09r  r   r   r  r2  )ry   r   )r   r   r   r;  r   on_save_checkpoint_startrx   r(   r   rE   r   r   rf   rH  on_save_checkpointr*  r   r   r   r5  on_save_checkpoint_end)rA   ry   r  r   r  r   r  rA  ri   Zoutput_dirnamer   r=   r=   r>   rD    sF   

	



zDistributedCheckpointer.savec                    sZ   t    | jtjkr'| jr)| j r+| jt	  | j
dd | j  d S d S d S d S )N<   rI  )r   finalizer   r   r   r   is_aliver   r   r   r;  rf   r   r   r=   r>   rP    s   
z DistributedCheckpointer.finalize)NF)NNN)r,   N)r   )+rQ   rR   rS   r  r   r   r   r   CallBackGrouprT   r   r	   r   r
   r   r  r   timerr   r   r   r   lr_schedulerLRScheduleramp
GradScalerr   rn   r   r   r*  r)  r;  r   r   r@  r   r   r  r   rD  rP  __classcell__r=   r=   r   r>   r     sb    %,

&P
&
9r   )TF)Rr   enumr   multiprocessingr   r6  r  r   collectionsr   r   typingr   r   r   r   r   r	   r
   r   torch.distributedtorch.distributed.checkpointr   
checkpointrm   r   r   r   ,torch.distributed.checkpoint.default_plannerr   'torch.distributed.checkpoint.state_dictr   r   r   r   r   %torch.distributed.checkpoint.statefulr   Z/cosmos_policy._src.imaginaire.checkpointer.baser   Z8cosmos_policy._src.imaginaire.checkpointer.s3_filesystemr   r   $cosmos_policy._src.imaginaire.configr   r   #cosmos_policy._src.imaginaire.modelr   #cosmos_policy._src.imaginaire.utilsr   r   r   +cosmos_policy._src.imaginaire.utils.easy_ior   r    Z_DefaultLoadPlannerr!   r"   r#   r$   r%   %torch.distributed.checkpoint.metadatar&   r'   r   r   rT   r?   r5   r'  r   rU   rw   rx   r   Enumr   r   r   r   r   r   r=   r=   r=   r>   <module>   s   #$

1,Y

R