o
    ?߱iF                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ e r`d dlmZ d d	lmZmZ d d
lm Z  ernd dl!m"Z" dMddZ#dNdOddZ$dNdOddZ%dPddZ&dPddZ'dQddZ(dRd!d"Z)dQd#d$Z*dSd*d+Z+G d,d- d-ej,j-j.Z.ed.d/ Z/dTd3d4Z0e1 dUd8d9Z2dVd=d>Z3dWd@dAZ4dXdCdDZ5		 		EdYdZdKdLZ6dS )[    )annotationsN)contextmanager)	timedelta)TYPE_CHECKINGAnyCallable	ContainerOptional)get_process_group_ranks)Device)_get_default_group)_sync_module_states$_verify_param_shape_across_processes)log)	DDPConfigreturn
int | Nonec               
   C  sR  t  r	tj S t  tt	dd} zt
| }td|  W n tjy> } ztd|  W Y d}~nd}~ww dtjd< dtjd< t  rutj|  t	d	d
}tt|d}t jdd|d tjd|  d| dd td}ttjd  ttj}|tdtd ||td tdt  d dS )z Initialize distributed training.
LOCAL_RANKr   zFailed to set device affinity: N0TORCH_NCCL_BLOCKING_WAIT1TORCH_NCCL_ASYNC_ERROR_HANDLINGZ TORCH_NCCL_HEARTBEAT_TIMEOUT_SECi  )secondsncclzenv://)backendinit_methodtimeoutz1Initialized distributed training with local rank z with timeout F)
rank0_onlyzlibcudart.so         zTraining with z GPUs.)distis_initializedtorchcudacurrent_devicepynvmlnvmlInitintosgetenvr   sched_setaffinityZget_cpu_affinity	NVMLErrorr   warningenvironis_available
set_devicer   init_process_groupcriticalctypesCDLLcastc_intPOINTERZcudaDeviceSetLimitZcudaDeviceGetLimitinfoget_world_size)
local_rankdeviceeZtimeout_secondsZtimeout_timedeltaZ
_libcudartp_value r>   [/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/utils/distributed.pyinit,   s6   



r@   groupOptional[dist.ProcessGroup]r(   c                 C  "   d}t  rt  rt | }|S )zgGet the rank (GPU device) of the worker.

    Returns:
        rank (int): The rank of the worker.
    r   )r!   r/   r"   get_rank)rA   rankr>   r>   r?   rD   P      
rD   c                 C  rC   )zGet world size. How many GPUs are available in this job.

    Returns:
        world_size (int): The total number of GPUs available in this job.
    r   )r!   r/   r"   r9   )rA   
world_sizer>   r>   r?   r9   \   rF   r9   boolc                   C  s
   t  dkS )zCheck if current process is the master GPU.

    Returns:
        (bool): True if this function is called from the master GPU, else False.
    r   )rD   r>   r>   r>   r?   is_rank0h   s   
rI   c                   C  s   t j dkS )zCheck if current process is the local master GPU in the current node.

    Returns:
        (bool): True if this function is called from the local master GPU, else False.
    r   )r#   r$   r%   r>   r>   r>   r?   is_local_rank0q   s   rJ   funcr   c                      t   fdd}|S )a  Apply this function only to the master GPU.

    Example usage:
        @rank0_only
        def func(x):
            return x + 3

    Args:
        func (Callable): a function.

    Returns:
        (Callable): A function wrapper executing the function only on the master GPU.
    c                    s   t  r
 | i |S d S N)rI   )argskwargsrK   r>   r?   wrapper   s   zrank0_only.<locals>.wrapper	functoolswrapsrK   rQ   r>   rP   r?   r   z   s   r   Nonec                   C  s$   t  rt  rt   dS dS dS )zBarrier for all GPUs.N)r!   r/   r"   barrierr>   r>   r>   r?   rW      s   rW   c                   rL   )z6run the function on rank 0 first, then on other ranks.c                    s2   t  r
 | i |}t  t  s | i |}|S rM   )rI   rW   )rN   rO   resultrP   r>   r?   rQ      s   zrank0_first.<locals>.wrapperrR   rU   r>   rP   r?   rank0_first   s   rY   
config_ddpr   modeltorch.nn.Module)torch.nn.Module | DistributedDataParallelc              
   C  s   t  rLt  rLttdd}zddlm} |jdd}W n t	y< } zt
| t
d d}W Y d}~nd}~ww t||g|| j| j| j|d}|S )	a  Wraps the model to enable data parallalism for training across multiple GPU devices.

    Args:
        config_ddp (DDPConfig): The data parallel config.
        model (torch.nn.Module): The PyTorch module.

    Returns:
        model (torch.nn.Module | DistributedDataParallel): The data parallel model wrapper
            if distributed environment is available, otherwise return the original model.
    r   r   )parallel_stateT)Zwith_context_parallelzAparallel_state not initialized, treating all GPUs equally for DDPN)
device_idsoutput_devicefind_unused_parametersstatic_graphbroadcast_buffersprocess_group)r!   r/   r"   r(   r)   r*   megatron.corer^   Zget_data_parallel_group	Exceptionr   r8   DistributedDataParallelra   rb   rc   )rZ   r[   r:   r^   Z	ddp_groupr<   r>   r>   r?   parallel_model_wrapper   s*   

	rh   c                      s,   e Zd ZdZd
 fddZddd	Z  ZS )rg   a  This extends torch.nn.parallel.DistributedDataParallel with .training_step().

    This borrows the concept of `forward-redirection` from Pytorch lightning. It wraps an ImaginaireModel such that
    model.training_step() would be executed when calling self.training_step(), while preserving the behavior of calling
    model() for Pytorch modules. Internally, this is a double rerouting mechanism (training_step -> forward ->
    training_step), allowing us to preserve the function names and signatures.
    r[   r\   c                   s$   t  j|g|R i | d| _d S )NT)super__init__#show_sync_grad_static_graph_warning)selfr[   rN   rO   	__class__r>   r?   rj      s   
z DistributedDataParallel.__init__r   r   c                   s,   j j  fdd}|j _|i |S )Nc                    s    j _j j| i |S rM   )moduleforwardtraining_step)_args_kwargsoriginal_forwardrl   r>   r?   wrapped_training_step   s   zDDistributedDataParallel.training_step.<locals>.wrapped_training_step)ro   rp   )rl   rN   rO   rv   r>   rt   r?   rq      s   z%DistributedDataParallel.training_step)r[   r\   )r   r   )__name__
__module____qualname____doc__rj   rq   __classcell__r>   r>   rm   r?   rg      s    rg   c              	   c  s    t | tjjs
J t | tr)| j}| jr&| j|kr&| jr%t	d d| _n|| _zdV  W t | tr8|| _dS dS t | trB|| _w )a  
    Context manager to enable/disable gradient synchronizations across DDP processes for DDP model.
    Modified from:
    https://pytorch.org/docs/stable/_modules/torch/nn/parallel/distributed.html#DistributedDataParallel.no_sync
    Note that this is incompatible with static_graph=True and will be an no-op if static_graph=True.

    Within this context, gradients will be accumulated on module
    variables, which will later be synchronized in the first
    forward-backward pass exiting the context.

    .. warning::
        The forward pass should be included inside the context manager, or
        else gradients will still be synchronized.
    zTDDP static_graph=True is incompatible with sync_grad(). Performance will be reduced.FN)

isinstancer#   nnModulerg   require_backward_grad_syncrb   rk   r   r-   )r[   enabledold_require_backward_grad_syncr>   r>   r?   ddp_sync_grad   s    



r   data_batcheslist[dict[str, torch.Tensor]]&torch.Tensor | dict[str, torch.Tensor]c                   s@  t | d tjrztj| dd}tjt|dd}tj|tjj	d t||k rKt|d |ks1J t
|dd }tj||gdd}tjddd}ntjddd}tj|tjjd t| }tj|ddjddd}|dkrx|d|  }|S t | d tjjrt }| d  D ] t fd	d
| D | < q|S t)a6  Aggregate the list of data batches from all devices and process the results.

    This is used for gathering validation data batches with cosmos_predict2._src.imaginaire.utils.dataloader.DistributedEvalSampler.
    It will return the data/output of the entire validation set in its original index order. The sizes of data_batches
    in different ranks may differ by 1 (if dataset size is not evenly divisible), in which case a dummy sample will be
    created before calling dis.all_gather().

    Args:
        data_batches (list[dict[str, torch.Tensor]]): List of tensors or (hierarchical) dictionary where
            leaf entries are tensors.

    Returns:
        data_gather (torch.Tensor | dict[str, torch.Tensor]): tensors or (hierarchical) dictionary where
            leaf entries are concatenated tensors.
    r   )dimr$   )r;   )opr   N)	start_dimend_dimc                   s   g | ]}|  qS r>   r>   ).0datakeyr>   r?   
<listcomp>/      z#collate_batches.<locals>.<listcomp>)r|   r#   Tensorcattensorlenr!   
all_reduceReduceOpMAX
empty_likeSUMall_gather_tensor
contiguousstackflattencollectionsabcMappingdictkeyscollate_batches	TypeError)r   Zdata_concatZmax_num_local_samplesdummyZdummy_countZdata_collater>   r   r?   r     s,   r   r   torch.Tensorlist[torch.Tensor]c                   s(    fddt t D }t|  |S )zGather the corresponding tensor from all GPU devices to a list.

    Args:
        tensor (torch.Tensor): Pytorch tensor.

    Returns:
        tensor_list (list[torch.Tensor]): A list of Pytorch tensors gathered from all GPU devices.
    c                   s   g | ]}t  qS r>   )r#   
zeros_like)r   _r   r>   r?   r   ?  s    z%all_gather_tensor.<locals>.<listcomp>)ranger9   r!   
all_gather)r   tensor_listr>   r   r?   r   5  s   
r   payloadr   list[Any] | Nonec                 C  s8   t  t }}|dkrdg| nd}tj| |dd |S )a#  Gather the corresponding object from all GPU devices to a rank 0 hosted list.

    Args:
        payload: Any pickle-able object.

    Returns:
        payload_list (list[Any]) | None:
            Rank 0: A list of Pytorch tensors gathered from all RANK process.
            Rest : None
    r   N)object_gather_listdst)rD   r9   r!   gather_object)r   rE   rG   Zpayload_gatheredr>   r>   r?   r   D  s   r   Fc                 C  s(   t  }|dk r	| S tj| |||d d S )N   )srcrA   async_op)r9   r!   	broadcast)r   r   rA   r   rG   r>   r>   r?   r   U  s   r   meanc                 C  s   t  }|dk r	| S t 5 tj| |d t |kr*|dkr#| | } n|dkr(ntW d   | S W d   | S W d   | S 1 sEw   Y  | S )zReduce to rank 0r   )r   r   sumN)r9   r#   no_gradr!   reducerD   NotImplementedError)r   rE   r   rG   r>   r>   r?   dist_reduce_tensor\  s,   



	
	
		r   Trd   r   params_and_buffers_to_ignoreOptional[Container[str]]rc   c                   s   t  rt  s
dS |du rt }st td| dt| d fdd|  D }t   fdd|D }dd |D }t	|d	krJdS t
|| t| |td
||d dS )aD  
    Modify based on DDP source code
    Synchronizes the parameters and buffers of a model across different processes in a distributed setting.

    This function ensures that all processes in the specified process group have the same initial parameters and
    buffers from the source rank, typically rank 0. It is useful when different processes start with different model
    states and a synchronization is required to ensure consistency across all ranks.

    Args:
        model (nn.Module): The model whose parameters and buffers are to be synchronized.
        process_group (dist.ProcessGroup, optional): The process group for communication. If None,
            the default group is used. Defaults to None.
        src (int, optional): The source rank from which parameters and buffers will be broadcasted.
            Defaults to 0.
        params_and_buffers_to_ignore (Optional[Container[str]], optional): A container of parameter and buffer
            names to exclude from synchronization. Defaults to None, which means all parameters and buffers are
            included.
        broadcast_buffers (bool, optional): Whether to broadcast buffers or not. Defaults to True.

    Side Effects:
        This function modifies the state of the model in-place to synchronize it with the source rank's model state.

    Raises:
        RuntimeError: If the shapes of parameters across processes do not match, a runtime error will be raised.

    Examples:
        >>> # downloading duplicated model weights from s3 in each rank and save network bandwidth
        >>> # useful and save our time when model weights are huge
        >>> if dist.get_rank == 0:
        >>>     model.load_state_dict(network_bound_weights_download_fn(s3_weights_path))
        >>> dist.barrir()
        >>> sync_model_states(model) # sync rank0 weights to other ranks
    Nz%Synchronizing model states from rank z to all ranks in process group .c                   s:   g | ]\ } fd d|j ddD D ]}||fqqS )c                   s&   g | ]\}}  d | vr|qS )r   r>   )r   
param_nameparam)module_namer   r>   r?   r     s
    z0sync_model_states.<locals>.<listcomp>.<listcomp>F)recurse)named_parameters)r   ro   	parameter)r   )r   r?   r     s    
z%sync_model_states.<locals>.<listcomp>c                   s*   g | ]\}}| vr  |s||fqS r>   )add)r   mp)memor>   r?   r     s
    c                 S  s   g | ]\}}|qS r>   r>   )r   r   r   r>   r>   r?   r     r   r   i  )ro   rd   broadcast_bucket_sizer   r   rc   )r!   r/   r"   r   setr   r8   r
   named_modulesr   r   r   r(   )r[   rd   r   r   rc   modules_and_parameters
parametersr>   )r   r   r?   sync_model_statesm  s8   (

	

r   )r   r   rM   )rA   rB   r   r(   )r   rH   )rK   r   r   r   )r   rV   )rZ   r   r[   r\   r   r]   )r   r   r   r   )r   r   r   r   )r   r   r   r   )NF)r   r   )Nr   NT)
r[   r\   rd   rB   r   r(   r   r   rc   rH   )7
__future__r   r   collections.abcr3   rS   r)   
contextlibr   datetimer   typingr   r   r   r   r	   r&   r#   torch.distributeddistributedr!   r
   Z,cosmos_predict2._src.imaginaire.utils.devicer   r/   "torch.distributed.distributed_c10dr   torch.distributed.utilsr   r   %cosmos_predict2._src.imaginaire.utilsr   Z&cosmos_predict2._src.imaginaire.configr   r@   rD   r9   rI   rJ   r   rW   rY   rh   r}   parallelrg   r   r   r   r   r   r   r   r   r>   r>   r>   r?   <module>   sV   
$

	
	


"

-


