o
    ?߱iy.                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
 d dlmZ d dlZd dlZd dlmZ d dlmZmZmZmZmZ d dlmZ d d	lmZmZmZ d d
lmZ d dl m!Z! d dl"m#Z# d dl$m%Z% dedefddZ&G dd dZ'dS )    N)Iterable)ThreadPoolExecutoras_completed)partial)Callable)reraise_exception)AugmentorConfigDatasetConfigDatasetInfo	TarSampleWdinfo)
WebDataset)remove_extensions_from_keys	skip_keys
update_url)instantiate)log)get_world_size)ObjectStorefuncdatac                 c   s(    |D ]}| |}|d u rq|V  qd S )N )r   r   	data_dictZdata_dict_outr   r   h/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/datasets/webdataset/webdataset.py wrap_augmentor_func_as_generator0   s   r   c                       sv   e Zd Zefdedef fddZddee de	fdd	Z
ed
d Zdeeef defddZdefddZ  ZS )Datasetconfighandlerc                    sl   t    || _t | _|j}|j| _t | _t | _	|j
| _tg dd| _| j|dd || _t | _dS )z{Webdataloader class

        Args:
            config: Dataset config
            world_size: Total number of GPUs
        r   T)dataset_infouse_multithreadN)super__init__r   r   
world_sizer   streaming_downloaddict	s3_clientbucketkeys	data_keysr   wdinfoparse_dataset_infor   
augmentors)selfr   r   r   	__class__r   r   r!   :   s   
zDataset.__init__Tr   r   c                    s  t dt| d|  t }dtdtf fdd}g }|rwt }t|dE}g }t	|D ](\}	}
t|
j
dkrEt d	|	 d
 q0t d|
j
  ||||	|
 q0t|D ]	}||  q]W d   n1 sqw   Y  nt	|D ]\}	}
t d|
j
  |||	|
 q{|D ]O}|d } j
j|d   j
 j|d 7  _tt|d dkrtd| d|d  d |d d  j
_|d r|d  j|< |d r|d  j|< qt }t dt| d j
j dt j
j d| d|| dd dS )a  Parse metadata about the list of tar files.

        Args:
            dataset_info (list): List of dictionaries containing paths to metadata files.
            use_multithread (bool): Whether to use multi-threaded parsing across datasets. Default: True.
        z Start parsing dataset info with z entries, use multithread = dset_num	dset_infoc              	      sR  j j}|_d| |rtj d}j j}|j}nd }d }d }g }d}g }jD ]t}	|rC||	s;t	|	 d|j
|	dd}
nt|	d}t|}
W d    n1 sXw   Y  |
d   d	r} d
d  dd}t|dkr{|d  nd |
d } fdd|D }|| ||
d 7 }||
d  q+|||||dS )Nzdset: {})config_object_storager   z
 not foundjson)keytyperrootzs3://   /    	data_listc              
      s.   g | ]}t | jrjnjd dqS )N)pathr6   r'   metadset_idsample_keys_full_list)r   per_dataset_keysr(   ).0tar_file)	data_rootr>   r0   r,   r   r   
<listcomp>   s    zNDataset.parse_dataset_info.<locals>.process_single_dataset.<locals>.<listcomp>total_key_count
chunk_size)r>   tar_samplesrE   chunk_sizesr%   r&   )object_store_configenableduse_object_storeformatr   r&   clientr)   object_existsFileNotFoundErrorload_objectopenr2   load
startswithsplitlenextendappend)r/   r0   rK   Zobject_store_readerZbucket_dsetZs3_client_dsetrG   rE   rH   Zwdinfo_pathZcur_dset_infofppartsZtar_files_listZlocal_tar_samplesr,   )rC   r>   r0   r   process_single_datasetb   sR   





z:Dataset.parse_dataset_info.<locals>.process_single_dataset)max_workersr   zNo wdinfo found for dataset z, skipping...zAdding: Nr>   rG   rE   rH   r9   z$Multiple chunk_size values found in z: z. Using the first one.r%   r&   zParsed dataset info with z wdinfos (num_keys = z, num_tars = z) and multithread = z, took z.2fz seconds)r   inforU   timeintr
   os	cpu_countr   	enumerater)   warningrW   submitr   result	tar_filesrV   rE   setwarningswarnrF   r%   r&   )r,   r   r   ticr[   Zdataset_resultsnum_workersexecutorfuturesir0   futurere   r>   Ztocr   rZ   r   r*   X   sP   C8zDataset.parse_dataset_infoc                 c   s:    |D ]}t |ddr|| } qt|| } q| E d H  d S )Nis_generatorF)getattrr   )r   augmentationsZaug_fnr   r   r   augmentor_fn   s   
zDataset.augmentor_fnaugmentor_cfgreturnc                 C   s2   g }|  D ]}|t||  qttj|dS )z<Function for building data augmentors from augmentor config.)rr   )r'   rW   r   r   r   rs   )r,   rt   rr   augr   r   r   build_data_augmentor   s   zDataset.build_data_augmentorc                 K   sp  | j j}t|}|dksJ dt| jd| j j}| jj}|| || j j t	|| j
| j| j| j| jd}|dkrD|t| t| jdg }g }|D ]}	t|	ts^t|	s^J d||	 qO|tj|  | jjru|t |t t| jdd }
t|
ttjjfsJ dt|
 | |
}|| |t | j j |_!t"#d	|  t"#d
|j!  |S )Nr   zDid not find any data.buffer_size)load_from_object_storer%   s3_bucket_namer#   r   decodersz*Decoder should either be callable or a straugmentationzgetting type: z#Total number of training shards: %dzTotal training key count: %d)$r)   rf   rU   rq   r   rF   distributorset_urlsset_chunk_sizer   rK   r%   r&   r#   r   rW   wdsshuffle
isinstancestrcallabledecoderemove_extension_from_keysr   r   r$   	omegaconf
dictconfig
DictConfigr4   rw   r   rE   total_imagesr   r]   )r,   kwargstar_listnum_tarsshuffle_buffer_sizedistributor_fndatasetdecoder_listdecoder_functionsdecoderrt   augmentation_fnr   r   r   build_dataset   sJ   







zDataset.build_dataset)T)__name__
__module____qualname__r   r	   r   r!   listr
   boolr*   staticmethodrs   r$   r   r   rw   r   r   __classcell__r   r   r-   r   r   9   s    s
	r   )(r2   r`   r^   rh   collections.abcr   concurrent.futuresr   r   	functoolsr   typingr   r   
webdatasetr   webdataset.handlersr   Acosmos_predict2._src.imaginaire.datasets.webdataset.config.schemar   r	   r
   r   r   Ccosmos_predict2._src.imaginaire.datasets.webdataset.utils.iteratorsr   >cosmos_predict2._src.imaginaire.datasets.webdataset.utils.miscr   r   r   +cosmos_predict2._src.imaginaire.lazy_configr   %cosmos_predict2._src.imaginaire.utilsr   Z1cosmos_predict2._src.imaginaire.utils.distributedr   Z2cosmos_predict2._src.imaginaire.utils.object_storer   r   r   r   r   r   r   <module>   s(   	