o
     ݱi                     @   s   d dl Z d dlZd dlmZ d dlmZmZ d dlmZm	Z	 d dl
Z
d dlZd dlZ	ddeedf deeejf d	ed
eedf fddZG dd dZdS )    N)Lock)
expanduser
expandvars)UnionOptional    >Ashape.dtypetarget_chunk_bytesreturnc                 C   s   t |j}t| ddd }t| d }tt| d D ]*}|t |d|  }|t |d|d   }||  krA|k rGn q|} nq|d| }	|t |	 }
t|| t	||
 }|	
| |	dgt| t|	   t|	ddd }|S )a  
    Calculates the optimal chunk sizes for an array given its shape, data type, and a target chunk size in bytes.

    Args:
        shape: The shape of the array.
        dtype: The data type of the array.
        target_chunk_bytes: The target size for each chunk in bytes. Defaults to 2e6 (2 MB).

    Returns:
        The optimal chunk dimensions for the given array shape and data type, aiming to not exceed
            the target chunk size in bytes.
    N   )npr	   itemsizelistlenrangeZprodminmathZceilappendextendtuple)r   r	   r
   r   ZrshapeZ	split_idxiZthis_chunk_bytesZnext_chunk_bytesZrchunksZitem_chunk_bytesZnext_chunk_lengthchunks r   A/data/cameron/vidgen/unified-world-model/datasets/utils/buffer.pyget_optimal_chunks   s&   
r   c                
   @   s   e Zd ZdZ		ddedeeeeef f dee dee	 fddZ
ed	ejfd
dZed	efddZed	efddZdeeejf fddZd	efddZdd Zdd Zdd Zdd Zdd ZdS ) CompressedTrajectoryBufferzI
    A class that stores trajectory data in a compressed zarr array.
    Nstorage_pathmetadatacapacitylockc              
      s  t t|}tj| _t| _|du rt	 n| _
tj jd _ jd _ jd _ jr{td|  d jv sDJ t fdd|D sQJ t fd	d| D s`J  fd
d jD }t|dkstJ d|  _dS  j
q td|  |dusJ d| _ jjddtjdd | D ]F\}}|ft|d  }|d }	|	tjkrd|dd  }
tjddtjjd}nt||	}
tjddtjjd} jj|||
|	|t d qW d   dS 1 sw   Y  dS )a  
        Initialize the trajectory buffer. If there is an existing buffer at the given path, it will be restored.

        Args:
            storage_path: Path to the buffer storage.
            metadata: Dictionary containing metadata for each data key. Each key should
                map to a dictionary containing the following keys:
                - shape: shape of the data
                - dtype: dtype of the data
            capacity: Maximum number of transition steps that can be stored in the buffer.
                Only used when creating a new buffer.
            lock: Multiprocessing lock to synchronize access to the buffer. If None, a new lock will be created.
        N)ZstoredatametazRestoring buffer from episode_endsc                 3   s    | ]}| j v V  qd S Nr"   .0keyselfr   r   	<genexpr>^   s    z6CompressedTrajectoryBuffer.__init__.<locals>.<genexpr>c                 3   s0    | ]\}} j | jd d |d kV  qdS )r   Nr   r"   r   )r(   r)   valuer*   r   r   r,   _   s
    
c                    s   h | ]
} j | jd  qS r   r-   r'   r*   r   r   	<setcomp>e   s    z6CompressedTrajectoryBuffer.__init__.<locals>.<setcomp>r   z'Inconsistent data lengths in the bufferzCreating new buffer at z)Capacity must be specified for new bufferr/   )namer   r	   
compressorr   r	   )r   Zlz4   )ZcnameZclevelZshuffler   )r1   r   r   r	   r2   Zobject_codec) r   r   ospathexistsZrestoredzarrZDirectoryStoreZstorager   r!   grouprootZrequire_groupr"   r#   printallitemsr   popr    Zzerosr   Zint64r   Zuint8	numcodecsZBloscZ	NOSHUFFLEr   ZPickle)r+   r   r   r    r!   Zlengthsr)   r.   r   r	   r   r2   r   r*   r   __init__:   sb   



"z#CompressedTrajectoryBuffer.__init__r   c                 C   s
   | j d S )Nr$   )r#   r*   r   r   r   r$         
z'CompressedTrajectoryBuffer.episode_endsc                 C   s
   t | jS r%   r   r$   r*   r   r   r   num_episodes   r@   z'CompressedTrajectoryBuffer.num_episodesc                 C   s   t | jdkr| jd S dS )Nr   r   rA   r*   r   r   r   	num_steps   s   z$CompressedTrajectoryBuffer.num_stepsr"   c                 C   sJ  | j  tdd | D }t||d ksJ |d }| j}|| }|| jkr/td| D ]\}}| j	| }||||< q3| j
t| j
d  || j
d< | j
jd | j
jd k r| j
jd d }	|	f| j
jdd   }
| jdd	 tj| jd	 | jd|
d d
 | jd	= W d    d S W d    d S 1 sw   Y  d S )Nc                 S   s   g | ]}|j d  qS r/   )r   )r(   vr   r   r   
<listcomp>   s    z:CompressedTrajectoryBuffer.add_episode.<locals>.<listcomp>r   zBuffer capacity exceededr   r   g      ?r$   Z_temp)sourceZdestr1   r   r2   )r!   r   Zarrayvaluesr;   rC   r    RuntimeErrorr<   r"   r$   Zresizer   r   r   r#   Zmover7   copy)r+   r"   Zepisode_lensZepisode_lenZ	start_indZend_indr)   r.   ZarrZnew_chunk_lenZ
new_chunksr   r   r   add_episode   s8   



"z&CompressedTrajectoryBuffer.add_episodec                 C   s   t | j S r%   )strr9   Ztreer*   r   r   r   __repr__   s   z#CompressedTrajectoryBuffer.__repr__c                 C   
   | j  S r%   )r"   keysr*   r   r   r   rN         
zCompressedTrajectoryBuffer.keysc                 C   rM   r%   )r"   rG   r*   r   r   r   rG      rO   z!CompressedTrajectoryBuffer.valuesc                 C   rM   r%   )r"   r<   r*   r   r   r   r<      rO   z CompressedTrajectoryBuffer.itemsc                 C   s
   | j | S r%   r&   r+   r)   r   r   r   __getitem__   rO   z&CompressedTrajectoryBuffer.__getitem__c                 C   s
   || j v S r%   r&   rP   r   r   r   __contains__   rO   z'CompressedTrajectoryBuffer.__contains__)NN)__name__
__module____qualname____doc__rK   dictanyr   intr   r?   propertyr   Zndarrayr$   rB   rC   rJ   rL   rN   rG   r<   rQ   rR   r   r   r   r   r   5   s4    
V$r   )r   )r   r4   Zmultiprocessingr   Zos.pathr   r   typingr   r   r>   Znumpyr   r7   r   rY   rK   r	   floatr   r   r   r   r   r   <module>   s&    


)