o
    {i7,                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZ d dlZd dlmZ e dZdadadadefddZd8defd	d
Zd8defddZd8defddZd9ddZd9ddZd:dedefddZdefddZdede
e fddZ G dd deZ!G dd dZ"dd d ddd!d"ed#ed$ed%ed&edB f
d'd(Z#d)d* Z$d9d+d,Z%d-eee  fd.d/Z&d0d1 Z'defd2d3Z(defd4d5Z)defd6d7Z*dS );    N)	timedelta)Enum)ListSequencedinov3returnc                   C   s   t  ot  S )zC
    Returns:
        True if distributed training is enabled.
    )distis_availableis_initialized r   r   X/data/cameron/keygrip/volume_dino_tracks/dinov3/distributed/torch_distributed_wrapper.pyis_distributed_enabled      r   c                 C      t  sdS tj| dS )z^
    Returns:
        The rank of the current process within the specified process group.
    r   group)r   r   get_rankr   r   r   r   r   !      r   c                 C   r   )zR
    Returns:
        The number of processes in the specified process group.
       r   )r   r   get_world_sizer   r   r   r   r   +   r   r   c                 C   s   t | dkS )zf
    Returns:
        True if the current process is the main one in the specified process group.
    r   )r   r   r   r   r   is_main_process5      r   c                  O   s,   | dd}t|sdS tj| i | dS )z4Utility function to save only from the main process.r   N)popr   torchsave)argskwargsr   r   r   r   save_in_main_process=   s   r   c                  C   s    ddl } | jadd }|| _dS )z=This function disables printing when not in the main process.r   Nc                  _   s,   | dd}t s|rt| i | d S d S )NforceF)r   r   _BUILTIN_PRINT)r   r   r   r   r   r   printL   s   
z._restrict_print_to_main_process.<locals>.print)builtinsr    r   )__builtin__r    r   r   r   _restrict_print_to_main_processE   s   
r#   seedc                 C   s:   d\}}t jd}|d u rt| }|||S t|S )N)i N  i`  MASTER_PORT)osenvirongetrandomRandomrandintint)r$   ZMIN_MASTER_PORTZMAX_MASTER_PORTZmaster_port_strrngr   r   r   _get_master_portT   s   
r.   c                  C   sN   t  t jt j} | d |  d }|W  d    S 1 s w   Y  d S )N) r   r   )socketAF_INETSOCK_STREAMbindgetsockname)sportr   r   r   _get_available_port_   s
   
$r7   r5   c                 C   s   t jddd| gdd S )NZscontrolshowZ	hostnamesT)text)
subprocesscheck_output
splitlines)r5   r   r   r   _parse_slurm_node_listh   s   r=   c                   @   s   e Zd ZdZdZdZdS )JobTypeZTorchElasticZSlurmmanualN)__name__
__module____qualname__TORCHELASTICSLURMMANUALr   r   r   r   r>   l   s    r>   c                   @   sV   e Zd ZdZdd Zdddededd fd	d
ZedefddZdd Z	dd Z
dS )TorchDistributedEnvironmentz
    Helper class to get (and set) distributed job information from the
    environment. Identifies and supports (in this order):
    - TorchElastic,
    - Slurm,
    - Manual launch (single-node).
    c                 C   sx  dt jv r>t jd | _tj| _t jd | _tt jd | _tt jd | _	tt jd | _
tt jd | _tt jd | _nldt jv rtt jd | _tj| _tt jd	 }tt jd
 }t||kseJ |d | _t| jd| _tt jd | _	tt jd | _
tt jd | _| j
| | _nd | _tj| _d| _t | _d| _	d| _
d| _d| _| j	| j
k sJ | j| jk sJ d S )NTORCHELASTIC_RUN_IDMASTER_ADDRr%   RANK
WORLD_SIZE
LOCAL_RANKLOCAL_WORLD_SIZEZSLURM_JOB_IDZSLURM_JOB_NUM_NODESZSLURM_JOB_NODELISTr   )r$   ZSLURM_PROCIDZSLURM_NTASKSZSLURM_LOCALIDz	127.0.0.1r   )r&   r'   job_idr>   rC   job_typemaster_addrr,   master_portrank
world_size
local_ranklocal_world_sizerD   r=   lenr.   rE   r7   )selfZ
node_countnodesr   r   r   __init__{   s>   


z$TorchDistributedEnvironment.__init__F)nccl_async_error_handling	overwriterY   r   c                C   s   | j t| jt| jt| jt| jt| jd}|r"|ddi |sB| D ]\}}|t	j
vr2q(t	j
| |kr:q(td| dt	j
| | S )N)rH   r%   rI   rJ   rK   rL   ZTORCH_NCCL_ASYNC_ERROR_HANDLING1z'Cannot export environment variables as z is already set)rO   strrP   rQ   rR   rS   rT   updateitemsr&   r'   RuntimeError)rV   rZ   rY   env_varskvr   r   r   export   s*   

z"TorchDistributedEnvironment.exportc                 C   s
   | j dkS Nr   )rQ   rV   r   r   r   r      s   
z+TorchDistributedEnvironment.is_main_processc              
   C   sL   | j j d| jrd| j dnd d| j d| j d| j d| j d		 S )
Nz job (z) r/   zusing :z (rank=z, world size=))rN   valuerM   rO   rP   rQ   rR   re   r   r   r   __str__   s   z#TorchDistributedEnvironment.__str__c                 C   s>   | j j d| j d| j d| j d| j d| j d| j dS )Nz(master_addr=z,master_port=z,rank=z,world_size=z,local_rank=z,local_world_size=rh   )	__class__r@   rO   rP   rQ   rR   rS   rT   re   r   r   r   __repr__   s   
z$TorchDistributedEnvironment.__repr__N)r@   rA   rB   __doc__rX   boolrc   propertyr   rj   rl   r   r   r   r   rF   r   s    .
$rF   TF)set_cuda_current_devicerZ   rY   restrict_print_to_main_processtimeoutrp   rZ   rY   rq   rr   c                 C   st   t durtdt }td|  |j||d | r#tj|j	 t
jd|d t
  |r3t  tjjja dS )a{  Enable distributed mode.

    Args:
        set_cuda_current_device: If True, call torch.cuda.set_device() to set the
            current PyTorch CUDA device to the one matching the local rank.
        overwrite: If True, overwrites already set variables. Else fails.
        nccl_async_error_handling: Enables NCCL asynchronous error handling. As a
            side effect, this enables timing out PyTorch distributed operations
            after a default 30 minutes delay).
        restrict_print_to_main_process: If True, the print function of non-main processes
            (ie rank>0) is disabled. Use print(..., force=True) to print anyway.
            If False, nothing is changed and all processes can print as usual.
        timeout: Timeout for operations executed against the process group.
            Default value is 10 minutes for NCCL and 30 minutes for other backends.
    Nz)Distributed mode has already been enabledz!PyTorch distributed environment: )rZ   rY   nccl)backendrr   )_DEFAULT_PROCESS_GROUPr_   rF   loggerinforc   r   cuda
set_devicerS   r   init_process_groupbarrierr#   distributedr   WORLD)rp   rZ   rY   rq   rr   Z	torch_envr   r   r   enable_distributed   s   r~   c                   C   s   t S N)ru   r   r   r   r   get_default_process_group  s   r   c                  C   sN   t d urdd l} t | _td urtjt d atd ur%tjt d ad S d S rd   )r   r!   r    _PROCESS_SUBGROUPr   r|   destroy_process_groupru   )r"   r   r   r   disable_distributed  s   r   all_subgroup_ranksc                 C   sn   t dd | D }t }t|tt|ksJ ||v sJ tdu s$J | D ]}tj|}||v r4|aq&dS )a  Create new process subgroups according to the provided specification.

    Args:
       all_subgroup_ranks: a sequence of rank sequences (first rank, ..., last rank),
           one for each process subgroup. Example: ((0, 1), (2, 3), (4, 5, 6, 7)).

    Note:
       This is similar to the (non-documented) new_subgroups_by_enumeration().
       This should be called once (and not sequentially) to create all subgroups.
    c                 s   s    | ]
}|D ]}|V  qqd S r   r   ).0subgroup_ranksrQ   r   r   r   	<genexpr>4  s    z new_subgroups.<locals>.<genexpr>N)tupler   rU   setr   r   r|   	new_group)r   Z	all_ranksrQ   r   subgroupr   r   r   new_subgroups)  s   r   c                   C   s   t ptS )zG
    Returns:
        The process subgroup of this rank (or None).
    )r   ru   r   r   r   r   get_process_subgroupB  s   r   c                   C      t t dS )zW
    Returns:
        The rank of the current process within its process subgroup.
    r   r   r   r   r   r   r   get_subgroup_rankJ  r   r   c                   C   r   )zJ
    Returns:
        The number of processes in the process subgroup
    r   )r   r   r   r   r   r   get_subgroup_sizeR  r   r   c                   C   s   t t ddkS )zc
    Returns:
        True if the current process is the main one within its process subgroup.
    r   r   r   r   r   r   r   is_subgroup_main_processZ  r   r   r   )r   N)r   )+loggingr&   r)   r0   r:   datetimer   enumr   typingr   r   r   torch.distributedr|   r   	getLoggerrv   ru   r   r   rn   r   r,   r   r   r   r   r#   r.   r7   r\   r=   r>   rF   r~   r   r   r   r   r   r   r   r   r   r   r   <module>   s`   




	r
0
