o
    viJ                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dl	m
Z
 d dlmZ d dlmZmZ z
d dlmZ dZW n eyK   dZed Y nw d d	lmZmZ d d
lmZ d dlmZmZmZmZmZ d dlm Z  d dl!m"Z" G dd dZ#dS )    N)INTERNAL)distributed_init)maybe_enable_memory_snapshotmaybe_enable_profiling)parallel_stateTFzMegatron-core is not installed.)
LazyConfiginstantiate)ImaginaireModel)callbackdistributedemalogmisc)Checkpointer)StragglerDetectorV2c                       s   e Zd ZdZ fddZdedejjj	dejjj	ddfd	d
Z
		ddejjejB dejjdejjjdejjdeeejf dededeeeejf ejef fddZe ddedejjj	deddfddZ  ZS )ImaginaireTrainera  The base trainer class of Imaginaire.

    All trainers in Imaginaire should inherit ImaginaireTrainer. It contains the basic functionality for model training
    (particularly suited for large-scale training), including data parallel (DDP/FSDP), model weight average (EMA),
    mixed-precision training (fp16/bf16).

    Attributes:
        checkpointer (Checkpointer): checkpointer object to save/load model weights and optimizer states.
        training_timer (misc.Timer): Timer object to time code blocks and functions.
    c                    s  t    || _t ] t  t|jdr+|jj	dkr t
dtd |jj	|j_	tradttjjv rFtj|jj|jj|jj	dd ntj|jj|jj|jj	d |jjt_tjrad	tjd
< W d   n1 skw   Y  t rtj|jjdd t||jj d t||jj d t   t!rt"|jj d t rt#d|j$dd  t%&g d nt%&ddg t%j'|j(j)dd |j(j*j+t,j-j*_+|j(j*j.t,j-j*_.d t,j-j*_/t,j-j0j1_/t2j3|| d| _4|j5j6du rt7|j5|j| j4d| _8nt9|j5j6|j5|j| j4d| _8t%: | _;t<| jj(j=j>| jj(j=j?| jj(j=j@| jj(j=jA| jj(j=jBd| _C| jCD  tEEtEjFtGHt%jI|j(jJ dS )zConstructor of the trainer.

        Args:
            config (Config): The config object for the Imaginaire codebase.
        context_parallel_size   zBoth config.model.context_parallel_size and config.model_parallel.context_parallel_size are set. config.model.context_parallel_size is deprecated. Please only set config.model_parallel.context_parallel_size.ztUsing deprecated config.model.context_parallel_size. Please use config.model_parallel.context_parallel_size instead.create_gloo_process_groupsF)pipeline_model_parallel_sizetensor_model_parallel_sizer   r   )r   r   r   1CUDA_DEVICE_MAX_CONNECTIONSNT)exist_okz/config.pklz/config.yamlz/stdout.logzConfig:
)	use_color)
TORCH_HOMEIMAGINAIRE_OUTPUT_ROOTZENABLE_ONELOGGERHF_HOMEr   )seedby_rank)configtrainer)	callbacks)enabledreport_freqprofile_freqmax_diffraise_error)Ksuper__init__r    r   r   inithasattrmodelmodel_parallelr   
ValueErrorr   criticalUSE_MEGATRONinspect	signaturer   initialize_model_parallel
parametersr   r   sequence_parallelosenvironis_rank0makedirsjob
path_localr   save_pkl	save_yamldistbarrierr   init_loguru_fileinfopretty_printr   print_environ_variablesset_random_seedr!   r   cudnndeterministictorchbackends	benchmark
allow_tf32cudamatmulr
   CallBackGroupr"   
checkpointtyper   checkpointerr   TrainingTimertraining_timerr   straggler_detectionr#   r$   r%   r&   r'   straggler_detector
initializesignalSIGALRM	functoolspartialtimeout_handlertimeout_period)selfr    	__class__ K/data/cameron/vidgen/cosmos-policy/cosmos_policy/_src/imaginaire/trainer.pyr)   9   s   


&






 zImaginaireTrainer.__init__r,   dataloader_traindataloader_valreturnNc                 C   s.  |j d| jjjd}|| jjj | j  || jj| jj	\}}t
jjdi | jjj}| j  | j||||}d}td| jjj  | jjjdkr[t| jjj|}	n| jjjdkre|}	n
td| jjj td | jj||d	 | jjjr|dkr| jjjr| j|||d	 d
}
t| j|d@}t| j|d'}	 t|}	 | j| zQz8|  d) | j!j"d| jjj#j$d
d t%|}W d   n1 sw   Y  W d   n1 sw   Y  W n t&y   Y W | j'| nw W | j'| n| j'| w || jjj(krd}
nt)j |dd}| jj*|||d	 | jj+|||d	 |j,s7|	-  |	j,s?J d|j,sGJ d| j.|	||||||d\}}}| jj/|||||d	 |dkrgq|d7 }|| jj0j1 dkr| jj2|||||d	 | jj3|||||d	 | jjjr|| jjj4 dkr| j|||d	 t56| jjj7 | j!8| |r|9  |r|9  q|
rnqW d   n	1 sw   Y  W d   n	1 sw   Y  t:d || jj0j1 dkr| jj2|||||d	 | jj;||d	 | j<  t=  | j>  dS )a  The training function.

        Args:
            model (ImaginaireModel): The PyTorch model.
            dataloader_train (torch.utils.data.DataLoader): The training data loader.
            dataloader_val (torch.utils.data.DataLoader): The validation data loader.
        rK   )memory_formatr   zDistributed parallelism mode: ddpfsdpz&Unknown distributed parallelism mode: zStarting training...	iterationF)global_stepTra   dataloading)profile_cudaNdevicez"model_ddp is not in training mode.zmodel is not in training mode.)rh   grad_accum_iterr   zDone with training.)rK   )?tor    r!   rd   on_train_startr"   on_optimizer_init_startZinit_optimizer_scheduler	optimizer	schedulerrG   amp
GradScalergrad_scaler_argson_optimizer_init_endrP   loadr   r/   distributed_parallelismr   parallel_model_wrapperre   r.   rA   run_validationrun_validation_on_startvalidater   r   iteron_before_dataloadingrR   rT   profile_sectionrS   analyze_dataloadingnextStopIterationon_after_dataloadingmax_iterr   on_training_step_starton_training_step_batch_starttrainingtraintraining_stepon_training_step_batch_endrN   	save_itersaveon_training_step_endvalidation_iterrV   alarmr[   generate_reportstepsuccesson_train_endfinalizer?   
on_app_end)r\   r,   ra   rb   rr   rs   grad_scalerrh   rn   	model_ddpZ_end_trainingtorch_profilerZmemory_profilerZdataloader_train_iter
data_batchoutput_batchlossr_   r_   r`   r      s   




 		

 <? 
D
zImaginaireTrainer.trainr   r   rr   rs   r   datarh   rn   c              
   C   s  t ||| jjjd k | jj|d | d+ | j	d| jjj
j |||\}}	W d   n1 s9w   Y  W d   n1 sHw   Y  | jj|d | jj||	|d | dB | j	d| jjj
j' ||	| jjj }
|
  | jjjdkr|j  n|  W d   n1 sw   Y  W d   n1 sw   Y  | jj||d W d   n1 sw   Y  |d7 }|| jjjkrE| d	f | j	d
| jjj
jJ | jj|||||d || |  |  | jj||||d | jjjdkr|jj|||d n|j|||d |jdd W d   n	1 s.w   Y  W d   n	1 s>w   Y  d}||	|fS )a  The training step.

        Args:
            model_ddp (torch.nn.Module | distributed.DistributedDataParallel): The model with a DDP wrapper or, the bare
              module, depending on whether distributed training is enabled or not.
            optimizer (torch.optim.Optimizer): The model optimizer.
            scheduler (torch.optim.lr_scheduler.LRScheduler): The optimization scheduler.
            grad_scaler (torch.amp.GradScaler): The gradient scaler (for mixed precision training).
            data (dict[str, torch.Tensor]): Data batch (dictionary of tensors).
            iteration (int): Current iteration number.
            grad_accum_iter (int): Number of gradient accumulation iterations.

        Returns:
            output (dict[str, torch.Tensor]): The model output from the training data batch (dictionary of tensors).
            loss (torch.Tensor): The total loss of the training data batch.
        r   rg   forwardfwdNbackwardbwdre   optimizer_stepoptT)set_to_noner   )r   ddp_sync_gradr    r!   rn   r"   on_before_forwardrR   rT   r   rS   analyze_forwardr   on_after_forwardon_before_backwardanalyze_backwardscaler   ry   moduleon_after_backwardanalyze_optimizeron_before_optimizer_stepr   updateon_before_zero_grad	zero_grad)r\   r   rr   rs   r   r   rh   rn   r   r   Zloss_scaledr_   r_   r`   r   	  sb   



zImaginaireTrainer.training_stepc              	   C   s   | j j|||d |  tj||jjjdD t|D ]7\}}| jjj	dur0|| jjj	kr0 n$t
j|dd}| j j|||d |||\}}| j j|||||d qW d   n1 s^w   Y  | j j||d dS )a  Validate on the full validation dataset.

        Args:
            model (ImaginaireModel): The PyTorch model.
            dataloader_val (torch.utils.data.DataLoader): The validation data loader.
            iteration (int): Current iteration number.
        rg   )r#   NrK   rl   )r"   on_validation_startevalr   Z	ema_scoper    r#   	enumerater!   max_val_iterr   ro   on_validation_step_startZvalidation_stepon_validation_step_endon_validation_end)r\   r,   rb   rh   Zval_iterr   r   r   r_   r_   r`   r}   M  s   	zImaginaireTrainer.validate)r   r   )r   )__name__
__module____qualname____doc__r)   r	   rG   utilsr   
DataLoaderr   nnModuler   DistributedDataParalleloptim	Optimizerlr_schedulerLRSchedulerrt   ru   dictstrTensorinttupler   no_gradr}   __classcell__r_   r_   r]   r`   r   -   sB    \
{	
D,r   )$rX   r1   r6   rV   rG   torch.distributedr   r>   torch.utils.data#cosmos_policy._src.imaginaire.flagsr   Z4cosmos_policy._src.imaginaire.utils.context_managersr   Z-cosmos_policy._src.imaginaire.utils.profilingr   r   megatron.corer   r0   ImportErrorprint)cosmos_policy._src.imaginaire.lazy_configr   r   #cosmos_policy._src.imaginaire.modelr	   #cosmos_policy._src.imaginaire.utilsr
   r   r   r   Z0cosmos_policy._src.imaginaire.utils.checkpointerr   (cosmos_policy._src.imaginaire.utils.miscr   r   r_   r_   r_   r`   <module>   s.   