o
    ?߱i1                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlmZ d dlmZ d d	lmZ G d
d de jZG dd deZG dd deZG dd deZdS )    N)contextmanager)	GeneratorUnion)urlparse)ClientError)FileSystemReaderFileSystemWriter)FileSystemBase)log)easy_ioc                       s(   e Zd ZdZdd Z fddZ  ZS )S3StreamaK  
    Workaround for PyTorch manually closing the stream before we can upload it to S3. We override the close() as noop
    and instead call our own _true_close() method to close the stream after we are done using it.
    The commit at fault is https://github.com/pytorch/pytorch/commit/9c909bf3bb122db2cce95e2eb7459bbe50dfa15a
    c                 C   s   |    d S N)flushself r   d/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/checkpointer/s3_filesystem.pyclose&   s   zS3Stream.closec                    s   t    d S r   )superr   r   	__class__r   r   _true_close*   s   zS3Stream._true_close)__name__
__module____qualname____doc__r   r   __classcell__r   r   r   r   r      s    r   c                   @   s  e Zd ZdZ					d-deded	ed
edededdfddZdd Z	e
deeejf dedeejddf fddZdeeejf dedeeejf fddZdeeejf deeejf fddZdeeejf deeejf ddfddZdeeejf ddfdd Zdeeejf dee fd!d"Zed#eeejf defd$d%Zdeeejf defd&d'Zdeeejf ddfd(d)Zd*edeeef fd+d,ZdS ).S3FileSystemz4Implementation of FileSystemBase for AWS S3 storage.         ?      >@       @Fcredential_pathmax_attemptsinitial_backoffmax_backoffbackoff_factorenable_gcs_patch_in_boto3returnNc                 C   sJ   t jd|ddd| _|| _|| _|| _|| _|| _|r#t	d dS dS )a  
        Initialize S3FileSystem with retry configuration.

        Args:
            credential_path: Path to AWS credentials JSON file
            max_attempts: Maximum number of retry attempts
            initial_backoff: Initial backoff time in seconds
            max_backoff: Maximum backoff time in seconds
            backoff_factor: Multiplicative factor for backoff time
            enable_gcs_patch_in_boto3: Whether to enable GCS patch in boto3
        s3N)backends3_credential_pathpath_mapping)backend_argszenable_gcs_patch_in_boto3: True)
r   get_file_backendeasy_io_backendr#   r$   r%   r&   r'   r
   info)r   r"   r#   r$   r%   r&   r'   r   r   r   __init__1   s   zS3FileSystem.__init__c           
      O   s`  d}| j }t| jD ]}z
||i |W   S  tyf } zD|jdi dd}tjd| dd |dv ra|}|| jd	 k rat|| j	}	tjd
|	 ddd t
|	 || j9 }W Y d}~q
 d}~w ty } z<tjdt| dd |}|| jd	 k rt|| j	}	tjd
|	 ddd t
|	 || j9 }W Y d}~q
W Y d}~q
d}~ww |)a  
        Execute an operation with exponential backoff retry logic.

        Args:
            operation_func: Function to execute
            *args: Positional arguments for the function
            **kwargs: Keyword arguments for the function

        Returns:
            Result of the operation function

        Raises:
            Exception: If all retry attempts fail
        NErrorCode z%S3 Filesystem: Received ClientError: F
rank0_only)SlowDownThrottlingExceptionRequestLimitExceededInternalError   zS3 Filesystem: Retrying in z secondsz#S3 Filesystem: Received Exception: )r$   ranger#   r   responsegetr
   r0   minr%   timesleepr&   	Exceptionstr)
r   Zoperation_funcargskwargslast_exceptionbackoffattempte
error_codeZcurrent_backoffr   r   r   _retry_with_backoffT   s>   



z S3FileSystem._retry_with_backoffpathmodec                 #   s"   t |  \}}tjd| d| dd |dkrRt z+ fdd}tjd| d	| dd | tjd
dd V  W   dS   w |dkrt z+V   fdd}tjd| d| dd | tjddd W 	  dS 	  w t
d| )zCCreate a stream for reading from or writing to S3 with retry logic.z#S3 Filesystem: Creating stream for z in bucket Fr5   rbc                      s"    jj d d d S )Nfilepathr   )writer/   r>   seekr   path_strr   streamr   r   download_operation   s   z6S3FileSystem.create_stream.<locals>.download_operationzS3 Filesystem: Downloading z from bucket z S3 Filesystem: Download completewbc                      s    d jj d d S )Nr   )objrP   )rR   r/   putr   rS   r   r   upload_operation   s   
z4S3FileSystem.create_stream.<locals>.upload_operationzS3 Filesystem: Uploading z to bucket zS3 Filesystem: Upload completezUnsupported mode: N)rC   _parse_s3_urir
   r0   ioBytesIOrK   r   r   r   
ValueError)r   rL   rM   bucketkeyrV   rZ   r   rS   r   create_stream   s.   

zS3FileSystem.create_streamsuffixc                 C   s,   t |}|dr| | S | d| S )z Concatenate S3 path with suffix./)rC   endswith)r   rL   rb   rT   r   r   r   concat_path   s   
zS3FileSystem.concat_pathc                 C   s&   t |}|dstd| d|S )z Initialize and validate S3 path.s3://Invalid S3 URI: . Must start with 's3://')rC   
startswithr^   )r   rL   rT   r   r   r   	init_path   s   
zS3FileSystem.init_pathnew_pathc                    sF   t |t |  fdd}| fdd}| dS )z/Rename (move) an object in S3 with retry logic.c                      s   j j d d S )N)srcdst)r/   copyfiler   dst_pathr   src_pathr   r   copy_operation   s   z+S3FileSystem.rename.<locals>.copy_operationc                      s    j jd d S NrO   )r/   remover   )r   rq   r   r   delete_operation      z-S3FileSystem.rename.<locals>.delete_operationN)rC   rK   )r   rL   rk   rr   ru   r   ro   r   rename   s   
zS3FileSystem.renamec                 C   s   dS )z
        Create a "directory" in S3.

        Note: S3 doesn't have real directories, but we can create an empty object
        with a trailing slash to simulate a directory.
        Nr   r   rL   r   r   r   mkdir   s   	zS3FileSystem.mkdirc                    s(   t |  fdd| jj dddD S )zDList objects under the given S3 path (prefix) and return s3:// URIs.c                    s    g | ]}  d  d | qS )rc   )removesuffix).0
obj_suffixrT   r   r   
<listcomp>   s    z#S3FileSystem.ls.<locals>.<listcomp>FT)dir_pathlist_dir	list_file)rC   r/   list_dir_or_filerx   r   r}   r   ls   s   
zS3FileSystem.lscheckpoint_idc                 C   sH   t |}z|dsW dS t|}t|jo|jW S  ty#   Y dS w )z0Validate if the checkpoint_id is a valid S3 URI.rf   F)rC   ri   r   boolnetlocrL   rB   )clsr   Zcheckpoint_id_strparsedr   r   r   validate_checkpoint_id   s   
z#S3FileSystem.validate_checkpoint_idc              
      sd   zdt f fdd}|W S  ty1 } z|jdi dddkr,W Y d}~d	S  d}~ww )
z1Check if an object exists in S3 with retry logic.r(   c                      s   j jt dS rs   )r/   existsrC   r   rL   r   r   r   head_operation   rv   z+S3FileSystem.exists.<locals>.head_operationr2   r3   r4   404NF)r   rK   r   r=   r>   )r   rL   r   rI   r   r   r   r      s   zS3FileSystem.existsc                    s    fdd} | dS )z'Remove a file from S3 with retry logic.c                      s   j jt d d S rs   )r/   rt   rC   r   r   r   r   ru      s   z.S3FileSystem.rm_file.<locals>.delete_operationN)rK   )r   rL   ru   r   r   r   rm_file   s   zS3FileSystem.rm_fileuric                 C   sf   t |tr|nt|}|dstd| dt|}|j}|jd}|s/td| d||fS )z
        Parse an S3 URI into bucket and key.

        Args:
            uri: S3 URI in the format s3://bucket-name/key

        Returns:
            Tuple of (bucket_name, key)

        Raises:
            ValueError: If the URI is invalid
        rf   rg   rh   rc   z. No bucket specified)
isinstancerC   ri   r^   r   r   rL   lstrip)r   r   r   r_   r`   r   r   r   r[      s   
zS3FileSystem._parse_s3_uri)r   r   r    r!   F)r   r   r   r   rC   intfloatr   r1   rK   r   r   osPathLiker   r\   IOBasera   re   rj   rw   ry   listr   classmethodr   r   r   tupler[   r   r   r   r   r   .   sF    
#1.*%&* r   c                	       sR   e Zd Z	ddedededdf fddZed	eeej	f defd
dZ
  ZS )S3StorageWriterFr"   rL   r'   r(   Nc                    s8   t  jd|dd| t||d| _| j|| _dS )au  
        Initialize an S3 writer for distributed checkpointing.

        Args:
            region (str): The AWS region for S3.
            path (str): The S3 URI to write checkpoints to.
            kwargs (dict): Keyword arguments to pass to the parent :class:`FileSystemWriter`.
            enable_gcs_patch_in_boto3 (bool): Whether to enable GCS patch in boto3
        F)rL   
sync_filesr'   Nr   )r   r1   r   fsrj   rL   )r   r"   rL   r'   rE   r   r   r   r1     s   zS3StorageWriter.__init__r   c                 C   
   t |S r   r   r   r   r   r   r   r   r   2     
z&S3StorageWriter.validate_checkpoint_idF)r   r   r   rC   r   r1   r   r   r   r   r   r   r   r   r   r   r     s    &r   c                	       s\   e Zd Z	ddedeeejf deddf fddZe	d	eeejf defd
dZ
  ZS )S3StorageReaderFr"   rL   r'   r(   Nc                    s2   t  | t||d| _| j|| _d| _dS )a-  
        Initialize an S3 reader for distributed checkpointing.

        Args:
            region (str): The AWS region for S3.
            path (Union[str, os.PathLike]): The S3 path to read checkpoints from.
            enable_gcs_patch_in_boto3 (bool): Whether to enable GCS patch in boto3
        r   FN)r   r1   r   r   rj   rL   r   )r   r"   rL   r'   r   r   r   r1   8  s   
zS3StorageReader.__init__r   c                 C   r   r   r   r   r   r   r   r   H  r   z&S3StorageReader.validate_checkpoint_idr   )r   r   r   rC   r   r   r   r   r1   r   r   r   r   r   r   r   r   7  s    &r   )r\   r   r@   
contextlibr   typingr   r   urllib.parser   botocore.exceptionsr   torch.distributed.checkpointr   r   'torch.distributed.checkpoint.filesystemr	   %cosmos_predict2._src.imaginaire.utilsr
   -cosmos_predict2._src.imaginaire.utils.easy_ior   r]   r   r   r   r   r   r   r   r   <module>   s     l