o
    ?߱id9                  
   @   s  d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ ej ej!ej"dZ#dej$dededefddZ%dej$dede&de&de&f
ddZ'ej(j)j*j+ej(j)j,j+ej(j)j-j+ej(j.j/j+ej(j)j0j+hZ1dej$fddZ2dej$defddZ3dej$fd d!Z4dej$d"efd#d$Z5dej$d"ed%e&d&e&fd'd(Z6dS ))    )defaultdictN)fully_shard)	replicate)	ReplicateShard)checkpoint_wrapper)
DeviceMesh)ColwiseParallelPrepareModuleInputRowwiseParallelSequenceParallelparallelize_module)log)ActivationCheckpointConfig)FSDP2ModelConfig)ParallelDims)float16float32bfloat16model
world_meshparallel_dims
job_configc                 C   s  |j r|jjr|jjstdt| |d |j|jj	|jjd |j
jdkr+t| |j
 |jjr3t|  |js9|jrm|jr?d}nd}t| |t|  |jrStd ntd |jr`td	 |jjrktd
 dS dS |jr|jdkrytdt| ||jj|jjd dS dS )z
    Apply tensor parallelism, activation checkpointing, torch.compile, and data
    parallelism to the model.

    NOTE: The passed-in model preferably should be on meta device. Otherwise,
    the model must fit on GPU or CPU memory.
    z$Async TP requires --training.compiletp)loss_parallelenable_float8enable_async_tpnone)dp_replicatedp_shard_cp)r   zApplied HSDP to the modelzApplied FSDP to the modelz%Applied Context Parallel to the modelz#Applied CPU Offloading to the model   z&DDP has not supported > 1D parallelism)enable_compileenable_compiled_autogradN)
tp_enabledexperimentalenable_async_tensor_paralleltrainingcompileRuntimeErrorapply_tploss_parallel_enabledfloat8enable_float8_linearactivation_checkpointmodeapply_acapply_compiledp_shard_enabled
cp_enableddp_replicate_enabled
apply_fsdptupleloggerinfoenable_cpu_offloadndim	apply_ddpr"   )r   r   r   r   dp_mesh_dim_names r<   d/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/reason1/parallelisms/parallelize_qwen.pyparallelize_qwen0   sV   




r>   tp_meshr   r   r   c                 C   s  t | |tt tdddt ttd|rtdnt | dd |r7ddlm}m}m	} |||}}	}
nttt
}}	}
| jjD ]j}t |
tddddddddft dddddddfd	|	 |	 |	 |tdd
t |
tdft fd	|	 |tdd
|	 d}dddddddddddd}i }| D ]
\}}|||| < q~|}t |||d qC|rddlm} dtjj_|| j td|rdnd |rdnd d dS )zApply tensor parallelism.r    F)input_layoutsoutput_layoutsuse_local_output)zmodel.embed_tokensz
model.normlm_headr   )Float8ColwiseParallelFloat8RowwiseParallelPrepareFloat8ModuleInputN)r@   desired_input_layouts)rA   )Zattention_norm	attentionzattention.wqzattention.wkzattention.wvzattention.woffn_normZfeed_forwardzfeed_forward.w1zfeed_forward.w2zfeed_forward.w3input_layernorm	self_attnzself_attn.q_projzself_attn.k_projzself_attn.v_projzself_attn.o_projpost_attention_layernormmlpzmlp.gate_projzmlp.down_projzmlp.up_proj)moduledevice_meshparallelize_plan)enable_symm_mem_for_groupTApplied zFloat8  zAsync zTensor Parallelism to the model)r   r   r   r   r   r	   Z%torchao.float8.float8_tensor_parallelrE   rF   rG   r
   r   layersitems#torch.distributed._symmetric_memoryrR   torch	_inductorconfig_micro_pipeline_tp	get_group
group_namer6   r7   )r   r?   r   r   r   rE   rF   rG   Zrowwise_parallelZcolwise_parallelZprepare_module_inputtransformer_blockZ
layer_planZnames_mappingZnew_layer_plankeyvaluerR   r<   r<   r=   r)   r   s   

'
r)   rO   c                    s
  d}|j |vrtd|j  d| |j dkrt| ddS |j dks(J |j  |jdk}|j }|s?|s?td	|j d
|r]ddlm m  fddfdd}t| |ddS |rt|j}tj	
dd t jd7  _|r{tj| dkrt| ddS | S d S )N)full	selectivezInvalid AC mode: z. Valid modes: ra   Fpreserve_rng_staterb   opzInvalid selective AC option: zD. Valid options: 'op' or a positive int representing layer frequencyr   )CheckpointPolicy$create_selective_checkpoint_contextsc                    s    fdd}|S )Nc                    sr   | j rdnd}| d}|tjjjjkr|  d7  < |tv o0|tjjjjko/| d dk }|r6 jS  jS )N	recomputeforwardZ	_mm_countr       r   )	is_recomputerX   opsatenmmdefault
_save_list	MUST_SAVEPREFER_RECOMPUTE)ctxfuncargskwargsr.   Zmm_count_keyto_save)rf   metar<   r=   _custom_policy  s   
*zR_apply_ac_to_transformer_block.<locals>._get_custom_policy.<locals>._custom_policyr<   )rx   ry   )rf   rx   r=   _get_custom_policy  s   	z:_apply_ac_to_transformer_block.<locals>._get_custom_policyc                     s   t t}  | S )N)r   intrz   )r{   rg   r<   r=   "selective_checkpointing_context_fn$  s   zJ_apply_ac_to_transformer_block.<locals>.selective_checkpointing_context_fn)
context_fnrd   _countr    )r.   
ValueErrorptd_checkpoint_wrapperselective_ac_optionisdigittorch.utils.checkpointrf   rg   r|   __dict__
setdefaultr   )rO   	ac_configZvalid_ac_modesZ
use_op_sacZuse_layer_sacr}   Zac_freqr<   )rf   r{   rg   r=   _apply_ac_to_transformer_block  s@   




r   r   c                 C   s   d|j ks
d|j kr-t| dr-| jdur-| jj D ]\}}t|dd}| jj|| qd|j ks7d|j krO| jj D ]\}}t	||}| jj|| q=t
d|j d	 dS )
z,Apply activation checkpointing to the model.visionvlmvisualNFrc   llmrS   z& activation checkpointing to the model)modelshasattrr   blocksnamed_childrenr   register_moduler   rU   r   r6   r7   r.   )r   r   layer_idblockr^   r<   r<   r=   r/   >  s   

r/   c                 C   s>   | j  D ]\}}tj|dd}| j || qtd dS )z
    Apply torch.compile to each TransformerBlock, which makes compilation efficient due to
    repeated structure. Alternatively one can compile the whole model (after applying DP).
    T)	fullgraphz2Compiling each TransformerBlock with torch.compileN)rU   r   rX   r'   r   r6   r7   )r   r   r^   r<   r<   r=   r0   S  s   r0   dp_meshc                 C   s^   | j durt| j jD ]
\}}t||d qt| jjD ]
\}}t||d qt| |d dS )z
    Apply data parallelism (via FSDP2) to the model.

    Args:
        model (nn.Module): The model to apply data parallelism to.
        dp_mesh (DeviceMesh): The device mesh to use for data parallelism.
    N)mesh)r   	enumerater   r   r   rU   )r   r   r   r   r^   r<   r<   r=   r4   _  s   
r4   r!   r"   c                 C   s:   |r|r
dt jj_ndt jj_t| |dd td d S )N'python_reducer_without_compiled_forwardddp_optimizerd   )rP   bucket_cap_mbzApplied DDP to the model)rX   _dynamorZ   optimize_ddpr   r6   r7   )r   r   r!   r"   r<   r<   r=   r:   v  s   
r:   )7collectionsr   rX   torch.nnnn"torch.distributed._composable.fsdpr   Z'torch.distributed._composable.replicater   torch.distributed._tensorr   r   ;torch.distributed.algorithms._checkpoint.checkpoint_wrapperr   r   torch.distributed.device_meshr   Z!torch.distributed.tensor.parallelr	   r
   r   r   r   %cosmos_predict2._src.imaginaire.utilsr   r6   9cosmos_predict2._src.reason1.configs.default.model_configr   r   	JobConfig7cosmos_predict2._src.reason1.parallelisms.parallel_dimsr   r   r   r   ZTORCH_DTYPE_MAPModuler>   boolr)   rl   rm   rn   ro   '_scaled_dot_product_efficient_attention#_scaled_dot_product_flash_attention_c10d_functionalreduce_scatter_tensormaxrp   r   r/   r0   r4   r:   r<   r<   r<   r=   <module>   sz   
B
 




:
