o
    ?߱ie                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZmZmZmZmZmZ d dlZd dlmZ zd dlZW n eyW   dZY nw d dlZd dlZd dlmZ d dlm Z  d d	l!m"Z"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z) d_d`ddZ*ddej+fdaddZ,dbd d!Z-dcd$d%Z.ddded*d+Z/	dfdgd1d2Z0dhd4d5Z1d_did8d9Z2ed:ed;ef d<Z3G d=d> d>e)Z4G d?d@ d@e	Z5G dAdB dBZ6djdGdHZ7G dIdJ dJZ8G dKdL dLZ9dkdOdPZ:dldTdUZ;dmdndYdZZ<G d[d\ d\Z=G d]d^ d^Z>dS )o    )annotationsN)ContextDecoratornullcontextfields)AnyCallableListTupleTypeVarUnion)logger)AsyncCollectiveTensor)DTensor)distributedlog)all_gather_tensor)easy_io)TimerTmodeltorch.nn.ModulevalueboolreturnNonec                 C  s   |   D ]}||_qdS )zSet a model to require gradients or not.

    Args:
        model (torch.nn.Module): Neural network model.
        value (bool): Whether the network requires gradients or not.
    N)
parametersrequires_grad)r   r   p r   T/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/utils/misc.pyr   /   s   r   datar   devicestr | torch.device | Nonedtypetorch.dtype | Nonememory_formattorch.memory_formatc                   s
  dusdusdusJ dt  tjrOtjkr!  dks,tjkr/  dkr/tjt tr8dkpBt tjoBj	dk} j
| d  S t  tjjrft	  fdd D S t  tjjrt  ttfst	 fd	d
 D S  S )aJ  Recursively cast data into the specified device, dtype, and/or memory_format.

    The input data can be a tensor, a list of tensors, a dict of tensors.
    See the documentation for torch.Tensor.to() for details.

    Args:
        data (Any): Input data.
        device (str | torch.device): GPU device (default: None).
        dtype (torch.dtype): data type (default: None).
        memory_format (torch.memory_format): memory organization format (default: torch.preserve_format).

    Returns:
        data (Any): Data cast to the specified device, dtype, and/or memory_format.
    Nz@at least one of device, dtype, memory_format should be specified      cpu)r!   r#   r%   non_blockingc              	     s"   i | ]}|t  | d qS r!   r#   r%   to.0keyr    r!   r#   r%   r   r   
<dictcomp>e   s   " zto.<locals>.<dictcomp>c                   s   g | ]
}t | d qS r+   r-   r0   elemr,   r   r   
<listcomp>g       zto.<locals>.<listcomp>)
isinstancetorchTensorchannels_lastdimchannels_last_3dpreserve_formatstrr!   typer.   collectionsabcMappingSequencebytes)r    r!   r#   r%   is_cpur   r2   r   r.   :   s0   

 r.   c                   s   t  tjjrt  fdd D S t  tjjr-t  ttfs-t dd  D S zt	  W  S  t
yC   t  Y  S w )zSerialize data by hierarchically traversing through iterables.

    Args:
        data (Any): Input data.

    Returns:
        data (Any): Serialized data.
    c                   s   i | ]	}|t  | qS r   	serializer/   r    r   r   r3   v   s    zserialize.<locals>.<dictcomp>c                 S  s   g | ]}t |qS r   rG   r4   r   r   r   r6   x       zserialize.<locals>.<listcomp>)r8   rA   rB   rC   r@   rD   r?   rE   jsondumps	TypeErrorrI   r   rI   r   rH   l   s   	
rH   env_vars	list[str]c              
   C  sZ   | D ](}|t jv rtdt| dtt j|   qtdt| d qdS )zPrint a specific list of environment variables.

    Args:
        env_vars (list[str]): List of specified environment variables.
    zEnvironment variable : z	 not set!N)osenvironr   infoColorgreenyellowwarning)rN   env_varr   r   r   print_environ_variables   s
   
*rY   Fseedintby_rankc                 C  sF   |r| t  7 } td|  d t|  tj|  t|  dS )zSet random seed. This includes random, numpy, Pytorch.

    Args:
        seed (int): Random seed.
        by_rank (bool): if true, each GPU will use a different random seed.
    zUsing random seed .N)	r   get_rankr   rS   randomrZ   npr9   manual_seed)rZ   r\   r   r   r   set_random_seed   s   
rb   shapeList[int] | Tuple[int]torch.dtypestr | torch.device
int | Nonec                 C  s2   t j|}|| t j}t|j||dS )am  Produce a GPU-architecture-invariant randomized Torch tensor.

    Args:
        shape (list or tuple of ints): Output tensor shape.
        dtype (torch.dtype): Output tensor type.
        device (torch.device): Device holding the output.
        seed (int): Optional randomization seed.

    Returns:
        tensor (torch.tensor): Randomly-generated tensor.
    )r#   r!   )	r`   r_   RandomStatestandard_normalastypefloat32r9   
from_numpyr.   )rc   r#   r!   rZ   rngrandom_arrayr   r   r   arch_invariant_rand   s   ro   &dict[str, torch.Tensor] | torch.Tensorc                   s8   d
 fdd  | }t |tstd| d|  |S )zGet the batch size from a data batch, a (possibly hierarchical) dictionary of tensors.

    Args:
        data (dict[str, torch.Tensor]): Data batch (dictionary of tensors).

    Returns:
        batch_size (int): Data batch size.
    
input_datar   r   Union[int, None]c                   s   t | tjr
t| S t | tjjr(|  D ]\}} |}|dur%|  S qdS t | tt	frGt| dkrGt | d tjrAt| S  | d S dS )z
        Helper function that recursively finds a tensor in the input data
        (could be a nested dictionary or list of tensors) and returns its batch size.
        Nr   )
r8   r9   r:   lenrA   rB   rC   itemslisttuple)rq   r1   r   
batch_size_get_batch_sizer   r   ry      s   
z,get_data_batch_size.<locals>._get_batch_sizezBatch size (z) obtained from invalid data: N)rq   r   r   rr   )r8   r[   
ValueError)r    rw   r   rx   r   get_data_batch_size   s
   

r{   module
persistentc           	      C  s~   t  }|  D ]\}}|||< q| D ](\}}|d}d|dd }| |}|d }t|| |j|||d qdS )a  Convert parameters in a module to buffers.
    Buffers do not have its own gradients and thus not updated by backpropagation.

    Args:
        module (torch.nn.Module): a module to convert parameters
        persistent (bool): If True, buffers are included in state_dict.
    r]   N)r}   )dictnamed_parametersrt   splitjoinget_submoduledelattrregister_buffer)	r|   r}   named_paramsnameparamZmodule_hierarchysubmodule_name	submodulesubnamer   r   r   parameters_to_buffer   s   



r   T.)boundc                      s$   e Zd ZdZd	d
 fddZ  ZS )timeraw  Simple CPU timer for timing the execution of code.

    It can be used as either a context manager or a function decorator. The timing result will be logged upon exit.

    Example:
        def func_a():
            time.sleep(1)
        with timer("func_a"):
            func_a()

        @timer("func_b)
        def func_b():
            time.sleep(1)
        func_b()
    Fcontextr?   debugr   c                   s   t  j|ddd|d d S )NTFs)tagmeasure_cpumeasure_cudaunitr   )super__init__selfr   r   	__class__r   r   r     s   
ztimer.__init__Fr   r?   r   r   )__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r      s    r   c                   @  s:   e Zd ZdZddddZdddZdddZdddZdS )memory_checkera  Simple memory checker for a given block of code.

    It can be used as either a context manager or a function decorator. The memory usage will be logged upon exit.
    Example:
        def func_a():
            torch.rand([int(1024**2)]).float().cuda()
        with memory_checker("func_a"):
            func_a()
        >>> 0.004GB memory used

        @memory_checker("func_b")
        def func_b():
            random_var = torch.rand([int(1024**2)]).cuda()
        func_b()
    Fr   r?   r   r   c                 C  s   || _ || _d S N)r   r   r   r   r   r   r   "  s   
zmemory_checker.__init__r   r   c                 C  s$   t j  t j  t j | _d S r   )r9   cudasynchronizereset_peak_memory_statsmax_memory_allocatedinitial_memoryr   r   r   r   	__enter__&  s   

zmemory_checker.__enter__c                 C  sV   t j  t j }d| j d|| j d dd}| jr$t| d S t| d S NzMemory used within rP   i   @z.4fz GB)	r9   r   r   r   r   r   r   r   rS   )r   exc_type	exc_value	tracebackfinal_memorymessager   r   r   __exit__+  s   

 zmemory_checker.__exit__funcr   c                   s   t   fdd}|S )Nc                    s   t j  t j  t j } | i |}t j  t j }dj d|| d dd}jr9t| |S t| |S r   )	r9   r   r   r   r   r   r   r   rS   )argskwargsr   resultr   r   r   r   r   r   wrapper5  s   






z(memory_checker.__call__.<locals>.wrapper)	functoolswraps)r   r   r   r   r   r   __call__4  s   zmemory_checker.__call__Nr   r   r   r   )r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r     s    

	r   c                   @  s`   e Zd ZdZdddZdddZddd	Zdd
dZdddZdddZ	dddZ
dddZdS )TrainingTimera_  Timer for timing the execution of code, aggregating over multiple training iterations.

    It is used as a context manager to measure the execution time of code and store the timing results
    for each function. The context managers can be nested.

    Attributes:
        results (dict): A dictionary to store timing results for various code.

    Example:
        timer = Timer()
        for i in range(100):
            with timer("func_a"):
                func_a()
        avg_time = sum(timer.results["func_a"]) / len(timer.results["func_a"])
        print(f"func_a() took {avg_time} seconds.")
    r   r   c                 C  s(   t  | _t  | _g | _g | _|   d S r   )r   resultsZaverage_resultstimers
func_stackresetr   r   r   r   r   Y  s
   zTrainingTimer.__init__c                 C  s   dd | j D | _ d S )Nc                 S  s   i | ]}|g qS r   r   r/   r   r   r   r3   a  s    z'TrainingTimer.reset.<locals>.<dictcomp>)r   r   r   r   r   r   `  s   zTrainingTimer.resetc                 C  s(   t ddddd}| j| |  | S )NTFr   )r   r   r   r   )r   r   appendstart)r   r   r   r   r   r   c  s   zTrainingTimer.__enter__c                 C  sF   | j  }|  | }| j }| j|g  | j| | d S r   )r   popendZget_cpu_timer   r   
setdefaultr   )r   r   r   r   r   r   r1   r   r   r   r   i  s   

zTrainingTimer.__exit__	func_namer?   c                 C  s   | j | | S r   )r   r   r   r   r   r   r   r   q  s   zTrainingTimer.__call__c                 C  
   |  |S r   r   r   r   r   r   __getattr__u     
zTrainingTimer.__getattr__c                 C  r   r   r   r   r   r   r   nestedx  r   zTrainingTimer.nesteddict[str, float]c                 C  s2   t  }| j D ]\}}t|t| ||< q|S r   )r   r   rt   sumrs   )r   r   r1   Z
value_listr   r   r   compute_average_results{  s   z%TrainingTimer.compute_average_resultsNr   )r   r   )r   r?   r   r   )r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   G  s    






r   timeout_periodfloatsignumframec                 C  s8   d|  d}t  rdd l}|jd||jjd t|)NzTimeout error: more than z) seconds passed since the last iteration.r   zTimeout error!)titletextlevel)r   is_rank0wandbZalertZ
AlertLevelERRORTimeoutError)r   r   r   error_messager   r   r   r   timeout_handler  s
   r   c                   @  sr   e Zd ZdZedddZedddZedd	d
ZedddZedddZ	edddZ
edddZdS )rT   zA convenience class to colorize strings in the console.

    Example:
        import
        print("This is {Color.red('important')}.")
    xr?   r   c                 C     t jt| ddS )Nredcolor	termcolorcoloredr?   r   r   r   r   r        z	Color.redc                 C  r   )NrU   r   r   r   r   r   r   rU     r   zColor.greenc                 C  r   )Nbluer   r   r   r   r   r   r     r   z
Color.bluec                 C  r   )Ncyanr   r   r   r   r   r   r     r   z
Color.cyanc                 C  r   )NrV   r   r   r   r   r   r   rV     r   zColor.yellowc                 C  r   )Nmagentar   r   r   r   r   r   r     r   zColor.magentac                 C  r   )Ngreyr   r   r   r   r   r   r     r   z
Color.greyN)r   r?   r   r?   )r   r   r   r   staticmethodr   rU   r   r   rV   r   r   r   r   r   r   rT     s     rT   c                   @  s8   e Zd ZdZdddZdddZed	d
 Zdd ZdS )	BufferCnta  
    Buffer counter which keeps track of the condition when called and returns True when the condition in met "thres"
    amount of times, otherwise returns False.

    Example usage:
        buf = BufferCnt(thres=3)
        for _ in range(5):
            if buf(random.random() > 0.5):
                print("We got lucky 3 times out of 5.")

    Args:
        thres (int): The amount of times the expression needs to be True before returning True.
        reset_over_thres (bool): Whether to reset the buffer after returning True.
    
   Fc                 C  s   d| _ || _|| _d S Nr   )_cntthresreset_over_thres)r   r   r   r   r   r   r        
zBufferCnt.__init__Nc                 C  sL   |du r|  j d7  _ nd| _ |d u r| j}| j |kr$| jr"|   dS dS )NT   r   F)r   r   r   r   )r   Zexprer   r   r   r   r     s   
zBufferCnt.__call__c                 C  s   | j S r   r   r   r   r   r   cnt  s   zBufferCnt.cntc                 C  s
   d| _ d S r   r   r   r   r   r   r     r   zBufferCnt.reset)r   Fr   )	r   r   r   r   r   r   propertyr   r   r   r   r   r   r     s    


r   	dataclassr   c                   s    fddt  D S )zConvert a dataclass to a dictionary.

    Args:
        dataclass (Any): Dataclass object.

    Returns:
        dict: Dictionary representation of the dataclass.
    c                   s   i | ]
}|j t |j qS r   )r   getattr)r0   fr   r   r   r3     r7   z.dataclass_instance_to_dict.<locals>.<dictcomp>r   r   r   r   r   dataclass_instance_to_dict  s   	r   tensortorch.Tensor | DTensortorch.tensorc                 C  s,   t | tr|  }t |tr| S |S | S r   )r8   r   Zto_localr   wait)r   localr   r   r   get_local_tensor_if_DTensor  s   

r     recompile_limituse_duck_shapec                 C  sr   z| t jj_|t jjj_W dS  ty8   z| t jj_	|t jjj_W Y dS  ty7 } zt
d |d}~ww w )a  
    Set some of the torch compile config options. The default values of arguments are default config values in PyTorch as of 2.10 version.
    The value recompile_limit=32 is useful for Wan Tokenizer encoding compilation, as the standard value of 8 can easily overflow.
    The value of use_duck_shape=False is useful for Cosmos3 MoT training to reduce recompilations.

    Args:
        recompile_limit (int): Controls the maximum number of cache entries with a guard on same ID_MATCH'd object.
        use_duck_shape (bool): This flag changes whether we should use the same symbolic variable to represent input sizes that are the same
    z=torch.compile is not available due to missing config options.N)r9   _dynamoconfigr  fxexperimental_configr  AttributeErrorZcache_size_limitr   rW   )r  r  er   r   r   set_torch_compile_options  s   



r  c                   @  s,   e Zd ZdZdddd	Zd
d Zdd ZdS )NVTXRangeContexta  
    Context manager which inserts NVTX range around the current context and optionally calls torch.cuda.synchronize
    at the start and the end of the context.

    Args:
        name (str): Name of the NVTX range.
        enabled (bool): Whether the context manager is enabled. When disabled, it does nothing. Default: True.
        synchronize (bool): Whether to call torch.cuda.synchronize() at the start and the end of the context. Default: True.
    Tr   r?   enabledr   r   c                 C  s   || _ || _|| _d S r   )r   r  r   )r   r   r  r   r   r   r   r     r   zNVTXRangeContext.__init__c                 C  s.   | j sd S | jrtj  tjj| j d S r   )r  r   r9   r   nvtx
range_pushr   r   r   r   r   r     s
   
zNVTXRangeContext.__enter__c                 C  s*   | j sd S | jrtj  tjj  d S r   )r  r   r9   r   r  	range_pop)r   r   exc_valexc_tbr   r   r   r   $  s
   
zNVTXRangeContext.__exit__N)TT)r   r?   r  r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r    s
    
r  c                   @  sJ   e Zd ZdZ					ddddZdd Zdd ddZdd Zdd ZdS )!StragglerDetectorV2a  StragglerDetectorV2 is a class that allows you to easily integrate "straggler" tool:
    https://gitlab-master.nvidia.com/dl/gwe/fault_tolerance_related/straggler/-/tree/cupti?ref_type=heads.

    This tool detects stragglers using low-level CUPTI tool, which can gather kernel execution time with very low overhead.
    The execution times are compared across different ranks, as well as to the execution time of the exact same kernels in the past.
    This tool can be easily integrated, as it's resilient to any synchronizations, since it captures kernels execution time.
    It means that we can wrap the entire  forward or backward passes and the stragglers will be identified regardless
    of synchronizations happening during the iteration.

    Args:
        enabled (bool): Whether the straggler detection is enabled. When disabled, it does nothing. Default: True.
        report_freq (int): Generate a report each report_freq iterations that analyzes the GPUs performance. Defaults to 100.
        profile_freq (int): Enable the CUPTI profiling each profile_freq iterations. Since the overhead is very low,
                            the default value is 1.
        max_diff (float): Defines the maximum relative difference between the fastest and the slowest rank to determine the slowdown. Defaults to 2.0
        raise_error (bool): Whether to raise error when stragglers are detected enough times. Defaults to True.Td   r          @r  r   report_freqr[   profile_freqmax_diffr   raise_errorc                 C  s:   || _ || _|| _| jj| _tddd| _|| _|| _	d S )Nr   T)r   r   )
r  r  r  r   r   r   r   slowdown_countr  r  )r   r  r  r  r  r  r   r   r   r   >  s   

zStragglerDetectorV2.__init__c                 C  s2   | j rts	tdtjjddgd| jd d S d S )NzPlease install straggler package before using StragglerDetectionV2.Package can be installed from here: https://gitlab-master.nvidia.com/dl/osiris/stragglerZrelative_perf_scoresZindividual_perf_scoresF)Zscores_to_computeZgather_on_rank0Zprofiling_interval)r  	stragglerRuntimeErrorDetector
initializer  r   r   r   r   r!  N  s   
zStragglerDetectorV2.initializer   r?   section_enabledprofile_cudac                 C  s    |r| j rtjj||dS t S )N)r#  )r  r  r   Zdetection_sectionr   )r   r   r"  r#  r   r   r   profile_section\  s   
z#StragglerDetectorV2.profile_sectionc                 C  s<   g }|D ]}| || tjj d  qtt| S )Ni  )	r   r  Z	StatisticMAXr   r   r9   r   r   )r   local_section_summariesr    r1   r   r   r   _aggregate_section_resultsb  s   z.StragglerDetectorV2._aggregate_section_resultsc              	     s  j rF|j dkrHtj }|jt  }tt	
|g }|j}t dkrJ|jdj d}fddt|D }t|jD ]\ | fddt|D  qIt	
|}t	|}	|dj d|	 dj d	t	| i t|jD ]+\ t	
fd
d|D }|d  dt	| d  dt	| i qdd l}
|
jr|
j||d dd l}|jjjjj r|dj  dkrt!"|dj#j$ d|dd t!"|dj#j$ d|dd |d }|d }|p|}|r+t	%t&t'( j)d }t|}||	 * + , -d}t./d|  0|rLj1rNt2d|	 d|	 d|j|	  dd S d S d S d S d S )Nr   r   )Zgpu_rel_thresholdc                   s*   i | ]\}} j  d | |d  qS )z/relative_gpu_perf_r   r   item)r0   rankZperfr   r   r   r3   s  s    z7StragglerDetectorV2.generate_report.<locals>.<dictcomp>c                   s2   i | ]\}}j  d   d|d|  qS )/_03dr(  )r0   r*  vr1   key_idr   r   r   r3   y  s   2 zslowest_rank/_rankZ_relative_perfc                   s   g | ]}|  qS r   r   )r0   r.  )r0  r   r   r6     rJ   z7StragglerDetectorV2.generate_report.<locals>.<listcomp>zslowest_rank/slowest__time)stepr(   zs3://rundir/z/iter_Z09dz.yamlz/report_iter_z.pklZstraggler_gpus_relativeZstraggler_gpus_individualzutf-8zSlowest rank hostname: zDetected GPU zD to be too slow compared to other GPUs. The relative performance of z
 rank was z. Terminating the training.)3r  r  r  r   generate_reportZgpu_relative_perf_scoresr   r^   r   r9   r   r   r'  r&  Zidentify_stragglersr  	enumerateupdateargminr   r)  minargmaxmaxr   runr   Z,cosmos_predict2._src.imaginaire.utils.launch_src
imaginaireutilsZlaunchZS3_READYr   dumpr   r   
ByteTensor	bytearrayrQ   unamenodenamer)   numpytobytesdecodeloggingcriticalr  r  r  )r   	iterationreportZgpu_relative_perf_scoreZ#gpu_relative_perf_score_gather_listZlocal_section_dataZ
stragglersZ
wandb_infoZdata_tensorZslowest_rank_idr   cosmos_predict2Zrelative_stragglersZindividual_stragglersZis_slowdownhostnameZwhole_hostnameZslowest_hostnamer   r/  r   r4  i  s|   



 
9z#StragglerDetectorV2.generate_reportN)Tr  r   r  T)
r  r   r  r[   r  r[   r  r   r  r   T)r   r?   r"  r   r#  r   )	r   r   r   r   r   r!  r$  r'  r4  r   r   r   r   r  ,  s    r  rM  )r   r   r   r   r   r   )
r    r   r!   r"   r#   r$   r%   r&   r   r   )r    r   r   r   )rN   rO   r   r   r   )rZ   r[   r\   r   r   r   r   )rc   rd   r#   re   r!   rf   rZ   rg   )r    rp   r   r[   )r|   r   r}   r   )r   r   r   r[   r   r[   r   r   )r   r   r   r   )r   r   r   r   )r  T)r  r[   r  r   )?
__future__r   rA   collections.abcr   rK   rQ   r_   
contextlibr   r   dataclassesr   typingr   r   r	   r
   r   r   rD  r`   logurur   rG  r  ImportErrorr   r9   )torch.distributed._functional_collectivesr   Ztorch.distributed._tensor.apir   %cosmos_predict2._src.imaginaire.utilsr   r   Z1cosmos_predict2._src.imaginaire.utils.distributedr   Z-cosmos_predict2._src.imaginaire.utils.easy_ior   Z+cosmos_predict2._src.imaginaire.utils.timerr   r   r>   r.   rH   rY   rb   ro   r{   r   r   r   r   r   r   rT   r   r   r  r  r  r  r   r   r   r   <module>   sb    
2

&6
;
%
-
