o
    ?߱i_(                     @   s  d dl Z d dlmZ zd dlm  mZ dZW n ey"   dZY nw d dlZd dlm	Z	 d dl
mZmZmZmZmZ d dlmZ d dlmZ d	e	d
edede	fddZejjd	e	d
edede	fddZd	e	d
edede	fddZejjd'dej	dedededej	f
ddZ	d(dej	eB dB dee dej	eB dB fddZ	d(dej	d
edee dej	fddZ	 d)d!ejd"ed#e eeef d$edejf
d%d&Z!dS )*    N)OptionalTF)Tensor)ProcessGroup
all_gatherbroadcast_object_listget_process_group_ranksget_world_size)$_verify_param_shape_across_processes)distributedxseq_dimcp_groupreturnc                 C   s   t |}t|}| j| | dksJ | j|  d| | jg | jd| || j| | | j|d d R  } tj| g| jd}| ||} | jg | jd| d| j|d d R  } | S )a  
    Split input tensor along the sequence dimension for checkpoint parallelism.

    This function divides the input tensor into equal parts along the specified
    sequence dimension, based on the number of ranks in the checkpoint parallelism group.
    It then selects the part corresponding to the current rank.

    Args:
        x: Input tensor to be split.
        seq_dim: The dimension along which to split the input (sequence dimension).
        cp_group: The process group for checkpoint parallelism.

    Returns:
        A slice of the input tensor corresponding to the current rank.

    Raises:
        AssertionError: If the sequence dimension is not divisible by the number of ranks.
    r   z cannot divide cp_size N   )device   )	r   lenshapeviewtorchtensorrankr   index_select)r   r   r   Zcp_rankscp_sizeseq_idx r   `/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/utils/context_parallel.pysplit_inputs_cp"   s   *>0r   c              
      sf   t |} fddt|D }z	t| |d W n ty+ } ztd| d}~ww tj||dS )aa  
    Concatenate outputs from different ranks in the checkpoint parallelism group.

    This function gathers tensors from all ranks in the checkpoint parallelism group
    and concatenates them along the specified sequence dimension.

    The function is decorated with @torch.compiler.disable because it contains distributed
    operations and dynamic tensor creation based on runtime rank information that seem to be
    incompatible with torch.compile's static graph compilation.

    Args:
        x: Input tensor to be concatenated.
        seq_dim: The dimension along which to concatenate the tensors (sequence dimension).
        cp_group: The process group for checkpoint parallelism.

    Returns:
        A tensor that is the concatenation of tensors from all ranks in the cp_group.

    Raises:
        RuntimeError: If the gather operation fails.
    c                       g | ]}t  qS r   r   
zeros_like.0_r   r   r   
<listcomp>\       z"cat_outputs_cp.<locals>.<listcomp>groupFailed to gather tensors: Ndim)r   ranger   RuntimeErrorr   cat)r   r   r   
world_sizegathered_tensorser   r%   r   cat_outputs_cpA   s   r3   c              
      s   |  }|dksJ d fddt|D }z	t| |d W n ty3 } ztd| d}~ww | } ||< tj||dS )	a  
    Concatenate outputs from different ranks in the context parallelism group.

    This function gathers tensors from all ranks in the checkpoint parallelism group
    and concatenates them along the specified sequence dimension.

    It retains computational graph locally for each rank by replacing the portion of the tensor with original output.

    Args:
        x: Input tensor to be concatenated.
        seq_dim: The dimension along which to concatenate the tensors (sequence dimension).
        cp_group: The process group for checkpoint parallelism.

    Returns:
        A tensor that is the concatenation of tensors from all ranks in the cp_group.

    Raises:
        RuntimeError: If the gather operation fails.
    r   z cp_size should be greater than 0c                    r   r   r    r"   r%   r   r   r&      r'   z,cat_outputs_cp_with_grad.<locals>.<listcomp>r(   r*   Nr+   )sizer-   r   r.   r   r   r/   )r   r   r   r   r1   r2   r   r   r%   r   cat_outputs_cp_with_gradh   s   r5   r   srcpgis_check_shapec                 C   s   t  |krtj| jtjd }ntj|  tjd }|r&t	||g tj j
|||d t  |kr?| | | } tj j
| ||d | S )a]  
    Perform a robust broadcast operation that works regardless of tensor shapes on different ranks.

    The function is decorated with @torch.compiler.disable because it contains distributed
    operations and dynamic tensor creation based on runtime rank information that seem to be
    incompatible with torch.compile's static graph compilation.

    Args:
        tensor (torch.Tensor): The tensor to broadcast (on src rank) or receive (on other ranks).
        src (int): The source rank for the broadcast. Defaults to 0.

    Returns:
        torch.Tensor: The broadcasted tensor on all ranks.
    )dtyper(   )r
   get_rankr   r   r   longcudaemptyr,   r	   	broadcast	new_emptytolisttype_as)r   r6   r7   r8   r   r   r   r   robust_broadcast   s   rB   itemprocess_groupc                 C   s\   |du r| S t t|}t| tjrt| ||} | S | dur,| g}t|||d |d } | S )zM
    Broadcast the item from the minimum rank in the specified group(s).
    Nr(   r   )minr   
isinstancer   r   rB   r   )rC   rD   min_rankZbroadcastable_listr   r   r   r>      s   r>   c                 C   s0   | du r| S t t|}t| ||} t| ||S )zO
    Broadcast the tensor from the minimum rank in the specified group(s).
    N)rE   r   rB   r   )r   r   rD   rG   r   r   r   broadcast_split_tensor   s
   rH   r   r   r   r   shape_tensorr   patch_valuesview_factorc                 C   s   t std| \}}}}}g }	|| dksJ || }d}
t|||gD ]1\}}|dkr8|dkr8td| d|| }t|| |}|| }|dkrN|}
|	||  q$|
t_t	
|	S )a  
    Find the shape of input tensor for post-CP split, taking into account both temporal and spatial split, as well as patching values.
    The split by width is not possible currently, due to memory stride issues, which break quality. This is checked
    by an assert.

    The spatial split is achieved by flattening the input video into a single dimension before CP split is performed,
    and rearranging it back into [T, H, W] format after the CP split, since the input passed to the model must still be in [T, H, W] format.

    Args:
        shape_tensor (torch.Size): The shape of the Tensor that we want to split. Needs to be in [B, C, T, H, W] format.
        cp_size (int): The Context Parallelism size that we want to use.
        patch_values (tuple[int, int, int], optional): The patch values that are applied inside the Diffusion model.
            First element of the tuple is temporal patch size. Two next elements are the spatial patch sizes.
            The default value is (1, 2, 2)
        view_factor (int, optional): The number of views that are present in the temporal dimension. Default value is 1.

    Returns:
        The torch.Size of how the post-split tensor should look like in [T, H, W] dimensions.

    zPNo megatron.core package found, which is required for Context Parallelism usage.r   r   r   zySplit by width dimension is not currently supported due to quality issues. Width dimension would be split by a factor of z0. Lower the CP size to avoid splitting by width.)USE_MEGATRONImportError	enumerate
ValueErrormathgcdappendparallel_state	cp_size_tr   Size)rJ   r   rK   rL   BCTHWretrU   ir4   
patch_sizerR   r   r   r   
find_split   s(   

r_   )F)N)rI   r   )"rQ   typingr   megatron.core.parallel_statecorerT   rM   rN   r   r   torch.distributedr   r   r   r   r   torch.distributed.utilsr	   %cosmos_predict2._src.imaginaire.utilsr
   intr   compilerdisabler3   r5   boolrB   strr>   rH   rV   tupler_   r   r   r   r   <module>   sb   &'&#

