o
    viq/                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
 d dlmZ d dlZd dlZd dlmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' dedefddZ(G dd dZ)dS )    N)Iterable)ThreadPoolExecutoras_completed)partial)Callable)reraise_exception)AugmentorConfigDatasetConfigDatasetInfo	TarSampleWdinfo)
WebDataset)remove_extensions_from_keys	skip_keys
update_url)instantiate)log)get_world_size)BaseStorageBackend)ObjectStorefuncdatac                 c   s(    |D ]}| |}|d u rq|V  qd S )N )r   r   	data_dictZdata_dict_outr   r   b/data/cameron/vidgen/cosmos-policy/cosmos_policy/_src/imaginaire/datasets/webdataset/webdataset.py wrap_augmentor_func_as_generator1   s   r   c                       sv   e Zd Zefdedef fddZddee de	fdd	Z
ed
d Zdeeef defddZdefddZ  ZS )Datasetconfighandlerc                    sr   t    || _t | _|j}|j| _d| _t | _	t | _
|j| _tg dd| _| j|dd || _t | _dS )z{Webdataloader class

        Args:
            config: Dataset config
            world_size: Total number of GPUs
        Fr   T)dataset_infouse_multithreadN)super__init__r   r   
world_sizer   streaming_downloaduse_object_storedicteasy_io_backendbucketkeys	data_keysr   wdinfoparse_dataset_infor   
augmentors)selfr   r   r   	__class__r   r   r"   ;   s   
zDataset.__init__Tr   r    c                    s  t dt| d|  t }dtdtf fdd}g }|rwt }t|dE}g }t	|D ](\}	}
t|
j
dkrEt d	|	 d
 q0t d|
j
  ||||	|
 q0t|D ]	}||  q]W d   n1 sqw   Y  nt	|D ]\}	}
t d|
j
  |||	|
 q{|D ]O}|d } j
j|d   j
 j|d 7  _tt|d dkrtd| d|d  d |d d  j
_|d r|d  j|< |d r|d  j|< qt }t dt| d j
j dt j
j d| d|| dd dS )a  Parse metadata about the list of tar files.

        Args:
            dataset_info (list): List of dictionaries containing paths to metadata files.
            use_multithread (bool): Whether to use multi-threaded parsing across datasets. Default: True.
        z Start parsing dataset info with z entries, use multithread = dset_num	dset_infoc              	      sR  j j}|_d| |rtj d}|j}j j}nd }d }d }g }d}g }jD ]t}	|rC||	s;t	|	 d|j
|	dd}
nt|	d}t|}
W d    n1 sXw   Y  |
d   d	r} d
d  dd}t|dkr{|d  nd |
d } fdd|D }|| ||
d 7 }||
d  q+|||||dS )Nzdset: {})config_object_storager   z
 not foundjson)keytyperrootzs3://   /    	data_listc              
      s.   g | ]}t | jrjnjd dqS )N)pathr8   r)   metadset_idsample_keys_full_list)r   per_dataset_keysr*   ).0tar_file)	data_rootr@   r2   r.   r   r   
<listcomp>   s    zNDataset.parse_dataset_info.<locals>.process_single_dataset.<locals>.<listcomp>total_key_count
chunk_size)r@   tar_samplesrG   chunk_sizesr'   r(   )object_store_configenabledr%   formatr   r'   r(   r+   object_existsFileNotFoundErrorload_objectopenr4   load
startswithsplitlenextendappend)r1   r2   r%   Zobject_store_readerZeasy_io_backend_dsetZbucket_dsetrI   rG   rJ   Zwdinfo_pathZcur_dset_infofppartsZtar_files_listZlocal_tar_samplesr.   )rE   r@   r2   r   process_single_datasetd   sR   






z:Dataset.parse_dataset_info.<locals>.process_single_dataset)max_workersr   zNo wdinfo found for dataset z, skipping...zAdding: Nr@   rI   rG   rJ   r;   z$Multiple chunk_size values found in z: z. Using the first one.r'   r(   zParsed dataset info with z wdinfos (num_keys = z, num_tars = z) and multithread = z, took z.2fz seconds)r   inforU   timeintr
   os	cpu_countr   	enumerater+   warningrW   submitr   result	tar_filesrV   rG   setwarningswarnrH   r'   r(   )r.   r   r    ticr[   Zdataset_resultsnum_workersexecutorfuturesir2   futurere   r@   Ztocr   rZ   r   r,   Z   sP   C8zDataset.parse_dataset_infoc                 c   s:    |D ]}t |ddr|| } qt|| } q| E d H  d S )Nis_generatorF)getattrr   )r   augmentationsZaug_fnr   r   r   augmentor_fn   s   
zDataset.augmentor_fnaugmentor_cfgreturnc                 C   s2   g }|  D ]}|t||  qttj|dS )z<Function for building data augmentors from augmentor config.)rr   )r)   rW   r   r   r   rs   )r.   rt   rr   augr   r   r   build_data_augmentor   s   zDataset.build_data_augmentorc                 K   sp  | j j}t|}|dksJ dt| jd| j j}| jj}|| || j j t	|| j
| j| j| j| jd}|dkrD|t| t| jdg }g }|D ]}	t|	ts^t|	s^J d||	 qO|tj|  | jjru|t |t t| jdd }
t|
ttjjfsJ dt|
 | |
}|| |t | j j |_!t"#d	|  t"#d
|j!  |S )Nr   zDid not find any data.buffer_size)load_from_object_storer'   s3_bucket_namer$   r   decodersz*Decoder should either be callable or a straugmentationzgetting type: z#Total number of training shards: %dzTotal training key count: %d)$r+   rf   rU   rq   r   rH   distributorset_urlsset_chunk_sizer   r%   r'   r(   r$   r   rW   wdsshuffle
isinstancestrcallabledecoderemove_extension_from_keysr   r   r&   	omegaconf
dictconfig
DictConfigr6   rw   r   rG   total_imagesr   r]   )r.   kwargstar_listnum_tarsshuffle_buffer_sizedistributor_fndatasetdecoder_listdecoder_functionsdecoderrt   augmentation_fnr   r   r   build_dataset   sJ   







zDataset.build_dataset)T)__name__
__module____qualname__r   r	   r   r"   listr
   boolr,   staticmethodrs   r&   r   r   rw   r   r   __classcell__r   r   r/   r   r   :   s    s
	r   )*r4   r`   r^   rh   collections.abcr   concurrent.futuresr   r   	functoolsr   typingr   r   
webdatasetr   webdataset.handlersr   ?cosmos_policy._src.imaginaire.datasets.webdataset.config.schemar   r	   r
   r   r   Acosmos_policy._src.imaginaire.datasets.webdataset.utils.iteratorsr   <cosmos_policy._src.imaginaire.datasets.webdataset.utils.miscr   r   r   )cosmos_policy._src.imaginaire.lazy_configr   #cosmos_policy._src.imaginaire.utilsr   Z/cosmos_policy._src.imaginaire.utils.distributedr   4cosmos_policy._src.imaginaire.utils.easy_io.backendsr   Z0cosmos_policy._src.imaginaire.utils.object_storer   r   r   r   r   r   r   <module>   s*   	