o
    ?߱iU                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dl	m
Z
 d dlmZ d dlmZmZ z
d dlmZ dZW n eyG   dZY nw d dlmZmZ d d	lmZ d d
lmZmZmZmZmZ d dlmZ d dl m!Z! G dd dZ"dS )    N)INTERNAL)distributed_init)maybe_enable_memory_snapshotmaybe_enable_profiling)parallel_stateTF)
LazyConfiginstantiate)ImaginaireModel)callbackdistributedemalogmisc)Checkpointer)StragglerDetectorV2c                       s   e Zd ZdZ fddZdedefddZdedej	j
jd	ej	j
jd
dfddZ		ddejjejB dejjdejjjdejjdeeejf deded
eeeejf ejef fddZe dded	ej	j
jded
dfddZ  ZS )ImaginaireTrainera  The base trainer class of Imaginaire.

    All trainers in Imaginaire should inherit ImaginaireTrainer. It contains the basic functionality for model training
    (particularly suited for large-scale training), including data parallel (DDP/FSDP), model weight average (EMA),
    mixed-precision training (fp16/bf16).

    Attributes:
        checkpointer (Checkpointer): checkpointer object to save/load model weights and optimizer states.
        training_timer (misc.Timer): Timer object to time code blocks and functions.
    c                    s  t    || _t ] t  t|jdr+|jj	dkr t
dtd |jj	|j_	tradttjjv rFtj|jj|jj|jj	dd ntj|jj|jj|jj	d |jjt_tjrad	tjd
< W d   n1 skw   Y  t rtj|jjdd t||jj d t||jj d t   t!rt"|jj d t rt#d|j$dd  t%&g d nt%&ddg t%j'|j(j)dd |j(j*j+t,j-j*_+|j(j*j.t,j-j*_.d t,j-j*_/t,j-j0j1_/t2j3|| d| _4|j5j6du rt7|j5|j| j4d| _8nt9|j5j6|j5|j| j4d| _8t%: | _;t<| jj(j=j>| jj(j=j?| jj(j=j@| jj(j=jA| jj(j=jBd| _Ct%D| jj(jEjF| jj(jEjG | jCH  tIItIjJtKLt%jM|j(jN dS )zConstructor of the trainer.

        Args:
            config (Config): The config object for the Imaginaire codebase.
        context_parallel_size   zBoth config.model.context_parallel_size and config.model_parallel.context_parallel_size are set. config.model.context_parallel_size is deprecated. Please only set config.model_parallel.context_parallel_size.ztUsing deprecated config.model.context_parallel_size. Please use config.model_parallel.context_parallel_size instead.create_gloo_process_groupsF)pipeline_model_parallel_sizetensor_model_parallel_sizer   r   )r   r   r   1CUDA_DEVICE_MAX_CONNECTIONSNT)exist_okz/config.pklz/config.yamlz/stdout.logzConfig:
)	use_color)
TORCH_HOMEIMAGINAIRE_OUTPUT_ROOTZENABLE_ONELOGGERHF_HOMEr   )seedby_rank)configtrainer)	callbacks)enabledreport_freqprofile_freqmax_diffraise_error)Osuper__init__r    r   r   inithasattrmodelmodel_parallelr   
ValueErrorr   criticalUSE_MEGATRONinspect	signaturer   initialize_model_parallel
parametersr   r   sequence_parallelosenvironis_rank0makedirsjob
path_localr   save_pkl	save_yamldistbarrierr   init_loguru_fileinfopretty_printr   print_environ_variablesset_random_seedr!   r   cudnndeterministictorchbackends	benchmark
allow_tf32cudamatmulr
   ZCallBackGroupr"   
checkpointtyper   checkpointerr   TrainingTimertraining_timerr   straggler_detectionr#   r$   r%   r&   r'   straggler_detectorset_torch_compile_optionscompile_configrecompile_limituse_duck_shape
initializesignalSIGALRM	functoolspartialtimeout_handlertimeout_period)selfr    	__class__ Q/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/trainer.pyr)   8   s   


&






 zImaginaireTrainer.__init__r,   	iterationc           
      C   s   t |dd}|du s|jszt|dfW S  ty   Y dS w ||j  }d}d}|j|krCzt|}W n tyB   d}d}Y nw ||g}t|j	 |}	tj
||	|j	 d |d |d fS )	a  
        Fetches data from the dataloader on the batch owner rank and broadcasts it to all other ranks in the Context Parallel group if CP is enabled.
        When CP is disabled, data is fetched from the dataloader on the current rank and no broadcasting is needed.

        Args:
            model (ImaginaireModel): The model containing parallel dimensions info.
            dataloader_iter: Iterator for the dataloader.
            iteration (int): Current iteration number to determine the batch owner.

        Returns:
            tuple: (data_batch, stop_signal)
                - data_batch: The fetched data batch (or None if stopped/not owner).
                - stop_signal (bool): True if StopIteration was encountered.
        parallel_dimsNF)NTT)srcgroupr   r   )getattr
cp_enablednextStopIterationZcp_meshsizecp_rankr>   get_global_rank	get_groupbroadcast_object_list)
r_   r,   dataloader_iterrd   re   Zbatch_owner_rankstop_signal
data_batchobjsZglobal_src_rankrb   rb   rc   _fetch_and_broadcast_data   s2   
z+ImaginaireTrainer._fetch_and_broadcast_datadataloader_traindataloader_valreturnNc                 C   sf  |j d| jjjd}|| jjj | j  || jj| jj	\}}t
jjdi | jjj}| j  | j||||}d}td| jjj  | jjjdkr[t| jjj|}	n| jjjdkre|}	n
td| jjj td | jj||d	 | jjjr|dkr| jjjr| j|||d	 d
}
t| j|dJ}t| j|d1}	 t|}	 | j| z[zA|  d2 | j!j"d| jjj#j$d
d | %|||\}}|rt&W d   n1 sw   Y  W d   n1 sw   Y  W n t&y   Y W | j'| nw W | j'| n| j'| w || jjj(kr d}
nt)j |dd}| jj*|||d	 | jj+|||d	 |j,sA|	-  |	j,sIJ d|j,sQJ d| j.|	||||||d\}}}| jj/|||||d	 |dkrqq|d7 }|| jj0j1 dkr| jj2|||||d	 | jj3|||||d	 | jjjr|| jjj4 dkr| j|||d	 t56| jjj7 | j!8| |r|9  |r|9  q|
rnqW d   n	1 sw   Y  W d   n	1 sw   Y  t:d || jj0j1 dkr	| jj2|||||d	 | jj;||d	 | j<  t=  | j>  t?@ r/t?A r1t?B  dS dS dS )a  The training function.

        Args:
            model (ImaginaireModel): The PyTorch model.
            dataloader_train (torch.utils.data.DataLoader): The training data loader.
            dataloader_val (torch.utils.data.DataLoader): The validation data loader.
        rK   )memory_formatr   zDistributed parallelism mode: ddpfsdpz&Unknown distributed parallelism mode: zStarting training...rd   F)global_stepTrv   Zdataloading)profile_cudaNdevicez"model_ddp is not in training mode.zmodel is not in training mode.)rd   grad_accum_iterr   zDone with training.)rK   )Ctor    r!   ry   Zon_train_startr"   Zon_optimizer_init_startZinit_optimizer_scheduler	optimizer	schedulerrG   amp
GradScalergrad_scaler_argsZon_optimizer_init_endrO   loadr   r/   distributed_parallelismr   parallel_model_wrapperrz   r.   rA   run_validationrun_validation_on_startvalidater   r   iterZon_before_dataloadingrQ   rS   profile_sectionrR   analyze_dataloadingru   rk   Zon_after_dataloadingmax_iterr   Zon_training_step_startZon_training_step_batch_starttrainingtraintraining_stepZon_training_step_batch_endrM   	save_itersaveZon_training_step_endvalidation_iterrY   alarmr^   generate_reportstepsuccessZon_train_endfinalizer?   Z
on_app_endr>   is_availableis_initializeddestroy_process_group)r_   r,   rv   rw   r   r   grad_scalerrd   r   	model_ddpZ_end_trainingZtorch_profilerZmemory_profilerZdataloader_train_iterrs   rr   output_batchlossrb   rb   rc   r      s   



 	

 BE 
J

zImaginaireTrainer.trainr   r   r   r   r   datar   c              
   C   s  t ||| jjjd k | jj|d | d+ | j	d| jjj
j |||\}}	W d   n1 s9w   Y  W d   n1 sHw   Y  | jj|d | jj||	|d | dB | j	d| jjj
j' ||	| jjj }
|
  | jjjdkr|j  n|  W d   n1 sw   Y  W d   n1 sw   Y  | jj||d W d   n1 sw   Y  |d7 }|| jjjkrE| d	f | j	d
| jjj
jJ | jj|||||d || |  |  | jj||||d | jjjdkr|jj|||d n|j|||d |jdd W d   n	1 s.w   Y  W d   n	1 s>w   Y  d}||	|fS )a  The training step.

        Args:
            model_ddp (torch.nn.Module | distributed.DistributedDataParallel): The model with a DDP wrapper or, the bare
              module, depending on whether distributed training is enabled or not.
            optimizer (torch.optim.Optimizer): The model optimizer.
            scheduler (torch.optim.lr_scheduler.LRScheduler): The optimization scheduler.
            grad_scaler (torch.amp.GradScaler): The gradient scaler (for mixed precision training).
            data (dict[str, torch.Tensor]): Data batch (dictionary of tensors).
            iteration (int): Current iteration number.
            grad_accum_iter (int): Number of gradient accumulation iterations.

        Returns:
            output (dict[str, torch.Tensor]): The model output from the training data batch (dictionary of tensors).
            loss (torch.Tensor): The total loss of the training data batch.
        r   r|   forwardfwdNbackwardbwdrz   Zoptimizer_stepoptT)set_to_noner   )r   ddp_sync_gradr    r!   r   r"   Zon_before_forwardrQ   rS   r   rR   analyze_forwardr   Zon_after_forwardZon_before_backwardanalyze_backwardscaler   r   moduleZon_after_backwardanalyze_optimizerZon_before_optimizer_stepr   updateZon_before_zero_grad	zero_grad)r_   r   r   r   r   r   rd   r   r   r   Zloss_scaledrb   rb   rc   r   H  sb   



zImaginaireTrainer.training_stepc              	   C   s   | j j|||d |  tj||jjjdD t|D ]7\}}| jjj	dur0|| jjj	kr0 n$t
j|dd}| j j|||d |||\}}| j j|||||d qW d   n1 s^w   Y  | j j||d dS )a  Validate on the full validation dataset.

        Args:
            model (ImaginaireModel): The PyTorch model.
            dataloader_val (torch.utils.data.DataLoader): The validation data loader.
            iteration (int): Current iteration number.
        r|   )r#   NrK   r   )r"   Zon_validation_startevalr   Z	ema_scoper    r#   	enumerater!   max_val_iterr   r   Zon_validation_step_startZvalidation_stepZon_validation_step_endZon_validation_end)r_   r,   rw   rd   Zval_iterrs   r   r   rb   rb   rc   r     s   	zImaginaireTrainer.validate)r   r   )r   )__name__
__module____qualname____doc__r)   r	   intru   rG   utilsr   
DataLoaderr   nnModuler   DistributedDataParalleloptim	Optimizerlr_schedulerLRSchedulerr   r   dictstrTensortupler   no_gradr   __classcell__rb   rb   r`   rc   r   ,   sN    _
5
 	
D,r   )#r[   r1   r6   rY   rG   torch.distributedr   r>   torch.utils.data%cosmos_predict2._src.imaginaire.flagsr   Z6cosmos_predict2._src.imaginaire.utils.context_managersr   Z/cosmos_predict2._src.imaginaire.utils.profilingr   r   megatron.corer   r0   ImportError+cosmos_predict2._src.imaginaire.lazy_configr   r   Z%cosmos_predict2._src.imaginaire.modelr	   %cosmos_predict2._src.imaginaire.utilsr
   r   r   r   Z2cosmos_predict2._src.imaginaire.utils.checkpointerr   *cosmos_predict2._src.imaginaire.utils.miscr   r   rb   rb   rb   rc   <module>   s,   