
    Ti"N                        d dl mZmZ d dlZd dlZd dlZd dlZd dlmZmZm	Z	m
Z
 d dlZd dlmZ d dlmc mc mZ d dlZd dlZd dlmc mZ d dlmc mZ d dlmZ d dlm Z!  e
dd          Z" G d d	ee"                   Z# G d
 dee"                   Z$ G d dee"                   Z% G d de#e"                   Z& G d de$e"                   Z' G d de#          Z(dej)        de*dej+        de#fdZ,dddej)        de*de*de-de#f
dZ.ddde#dej)        d e-de#fd!Z/ddd"de$dej)        d e-d#e-de$f
d$Z0ddddd%d&d'ej1        d(ej2        j3        dz  de-d)e*dz  d e-d*ed+         de%e4ej5        ej6        f                  fd,Z7ddddd d d%d-dej)        dej+        de*de*d(ej2        j3        dz  d e-de-d)e*dz  d.e*d/e*d*e8de%e4ej5        ej6        f                  fd0Z9ddddd%d1dej)        de*de*d(ej2        j3        dz  d e-de-d)e*dz  d*e8de%e4ej5        ej6        f                  fd2Z: G d3 d4          Z;d5 Z<d6e*ddfd7Z= G d8 d9          Z> G d: d;e%          Z?dS )<    )IteratorSequenceN)LiteralProtocolSupportsIndexTypeVar)DroidRldsDatasetT_coT)	covariantc                   .    e Zd ZdZdedefdZdefdZdS )Datasetz+Interface for a dataset with random access.indexreturnc                      t          d          )Nz3Subclasses of Dataset should implement __getitem__.NotImplementedErrorselfr   s     </home/robot-lab/Pi0.5_yam/src/openpi/training/data_loader.py__getitem__zDataset.__getitem__       !"WXXX    c                      t          d          Nz/Subclasses of Dataset should implement __len__.r   r   s    r   __len__zDataset.__len__       !"STTTr   N)	__name__
__module____qualname____doc__r   r
   r   intr    r   r   r   r      sd        55Y Y4 Y Y Y YU U U U U U Ur   r   c                   6    e Zd ZdZdee         fdZdefdZdS )IterableDatasetz"Interface for an iterable dataset.r   c                      t          d          )Nz8Subclasses of IterableDataset should implement __iter__.r   r   s    r   __iter__zIterableDataset.__iter__#   s    !"\]]]r   c                      t          d          r   r   r   s    r   r   zIterableDataset.__len__&   r   r   N)	r   r   r    r!   r   r
   r'   r"   r   r#   r   r   r%   r%       s`        ,,^(4. ^ ^ ^ ^U U U U U U Ur   r%   c                   @    e Zd ZdZdej        fdZdee         fdZ	dS )
DataLoaderzInterface for a data loader.r   c                      t          d          )z)Get the data config for this data loader.z6Subclasses of DataLoader should implement data_config.r   r   s    r   data_configzDataLoader.data_config-   s    !"Z[[[r   c                      t          d          )Nz3Subclasses of DataLoader should implement __iter__.r   r   s    r   r'   zDataLoader.__iter__1   r   r   N)
r   r   r    r!   _config
DataConfigr,   r   r
   r'   r#   r   r   r*   r*   *   sc        &&\W/ \ \ \ \Y(4. Y Y Y Y Y Yr   r*   c                   P    e Zd Zdedeej                 fdZdede	fdZ
defdZdS )	TransformedDatasetdataset
transformsc                 F    || _         t          j        |          | _        d S N)_dataset_transformscompose
_transform)r   r2   r3   s      r   __init__zTransformedDataset.__init__6   s    %-j99r   r   r   c                 B    |                      | j        |                   S r5   )r9   r6   r   s     r   r   zTransformedDataset.__getitem__:   s    t}U3444r   c                 *    t          | j                  S r5   lenr6   r   s    r   r   zTransformedDataset.__len__=       4=!!!r   N)r   r   r    r   r   r7   DataTransformFnr:   r   r
   r   r"   r   r#   r   r   r1   r1   5   s{        : :Xk>Y5Z : : : :5 54 5 5 5 5" " " " " " "r   r1   c                   P    e Zd Zdddedeej                 defdZd Z	de
fd	Zd
S )IterableTransformedDatasetF
is_batchedr2   r3   rD   c                T    || _         t          j        |          | _        || _        d S r5   )r6   r7   r8   r9   _is_batched)r   r2   r3   rD   s       r   r:   z#IterableTransformedDataset.__init__B   s+      %-j99%r   c              #   <   K    j         D ] j        rot          d                                 D                       }fdt	          |          D             } fd|D             }t          j        j        d g|R  V  x                               V  d S )Nc              3   0   K   | ]}|j         d          V  dS )r   N)shape).0vs     r   	<genexpr>z6IterableTransformedDataset.__iter__.<locals>.<genexpr>R   s(      !F!F!'!*!F!F!F!F!F!Fr   c                 X    g | ]%t           j                            fd           &S )c                     |          S r5   r#   )xis    r   <lambda>z@IterableTransformedDataset.__iter__.<locals>.<listcomp>.<lambda>U   s    QqT r   jaxtreemap)rJ   rP   samples    @r   
<listcomp>z7IterableTransformedDataset.__iter__.<locals>.<listcomp>U   s0    %f%f%fqchll>>>>6&J&J%f%f%fr   c                 :    g | ]}                     |          S r#   )r9   )rJ   sr   s     r   rW   z7IterableTransformedDataset.__iter__.<locals>.<listcomp>X   s%    NNNatq11NNNr   c                  .    t          j        | d          S )Nr   axisnpstack)rO   s    r   rQ   z5IterableTransformedDataset.__iter__.<locals>.<lambda>[   s    bhqq.A.A.A r   )	r6   rF   nextvaluesrangerS   rT   rU   r9   )r   
batch_sizeindividual_samplestransformedrV   s   `   @r   r'   z#IterableTransformedDataset.__iter__M   s      m 	. 	.F . "!F!Ffmmoo!F!F!FFF
 &g%f%f%fTYZdTeTe%f%f%f" ONNN;MNNN hl#A#APKPPPPPPPoof------	. 	.r   r   c                 *    t          | j                  S r5   r=   r   s    r   r   z"IterableTransformedDataset.__len___   r?   r   N)r   r   r    r%   r   r7   r@   boolr:   r'   r"   r   r#   r   r   rB   rB   A   s         !	& 	& 	& 	& [89	&
 	& 	& 	& 	&. . .$" " " " " " "r   rB   c                   D    e Zd Zdej        defdZdedefdZ	defdZ
dS )	FakeDatasetmodel_confignum_samplesc                 V    || _         |                                \  | _        | _        d S r5   )_num_samplesinputs_spec_observation_spec_action_spec)r   rj   rk   s      r   r:   zFakeDataset.__init__d   s,    '4@4L4L4N4N1 1 1 1r   r   r   c                 R   t           j                            |                                          dt           j        ffd}t           j                            || j                  }t           j                            || j                  }i |	                                d|iS )Nspecc                 |   t           j                                      \  }| j        dd          }| j        t
          j        k    r#t           j                            ||dd          S | j        t
          j        k    r#t           j        	                    ||dd          S t          j
        || j                  S )N   g      g      ?)rI   minvalmaxvalr   i   )rI   dtype)rS   randomsplitrI   rw   jnpfloat32uniformint32randintzeros)rr   data_rngrI   rngs      r   make_from_specz/FakeDataset.__getitem__.<locals>.make_from_speck   s    J,,S11MCJqrrNEzS[((z))(%UX)YYYzSY&&z))(%RV)WWW95
;;;;r   actions)
rS   rx   key	__index__ShapeDtypeStructrT   rU   ro   rp   to_dict)r   r   r   observationactionr   s        @r   r   zFakeDataset.__getitem__h   s    jnnU__..//		<!5 		< 		< 		< 		< 		< 		< hll>43IJJnd.?@@
!!##
v
 
 	
r   c                     | j         S r5   )rm   r   s    r   r   zFakeDataset.__len__~         r   N)r   r   r    _modelBaseModelConfigr"   r:   r   dictr   r   r#   r   r   ri   ri   c   s|        OV%; O# O O O O
 
4 
 
 
 
,! ! ! ! ! ! !r   ri   r,   action_horizonrj   r   c                 J   | j         }|t          d          |dk    rt          |d          S t          j        |          t          j        | j         fd| j        D             d          }| j        r(t          |t          j
        j                  g          }|S )	zCreate a dataset for training.Nz*Repo ID is not set. Cannot create dataset.fakei   )rk   c                 H    i | ]}|fd t                    D             S )c                 $    g | ]}|j         z  S r#   )fps)rJ   tdataset_metas     r   rW   z3create_torch_dataset.<locals>.<dictcomp>.<listcomp>   s!    FFF1!l&&FFFr   )rb   )rJ   r   r   r   s     r   
<dictcomp>z(create_torch_dataset.<locals>.<dictcomp>   sE     
 
 
KNCFFFFn0E0EFFF
 
 
r   g    .A)delta_timestampstolerance_s)repo_id
ValueErrorri   lerobot_datasetLeRobotDatasetMetadataLeRobotDatasetaction_sequence_keysprompt_from_taskr1   r7   PromptFromLeRobotTasktasks)r,   r   rj   r   r2   r   s    `   @r   create_torch_datasetr      s     !GEFFF&<T::::"9'BBL,
 
 
 
 
R]Rr
 
 
   G # g$W{/PQ]Qc/d/d.effNr   Fshufflerc   r   c                J    t          | j        |||| j        | j                  S )N)data_dirrc   r   action_chunk_sizeaction_spacedatasets)r	   rlds_data_dirr   r   )r,   r   rc   r   s       r   create_rlds_datasetr      s6     *( -%   r   skip_norm_statsr2   r   c                    i }|j         dk    r|s|j        t          d          |j        }t          | g |j        j        |j        j        t          j        ||j	                  |j
        j                  S )6Transform the dataset by applying the data transforms.r   NlNormalization stats not found. Make sure to run `scripts/compute_norm_stats.py --config-name=<your-config>`.use_quantiles)r   
norm_statsr   r1   repack_transformsinputsdata_transformsr7   	Normalizeuse_quantile_normmodel_transforms)r2   r,   r   r   s       r   transform_datasetr      s    Jf$$_$!)`   !+
	
*1	
(/	
 !*K<YZZZ	
 )0		
  r   r   rD   rD   c                    i }|j         dk    r|s|j        t          d          |j        }t          | g |j        j        |j        j        t          j        ||j	                  |j
        j        |          S )r   r   Nr   r   rC   )r   r   r   rB   r   r   r   r7   r   r   r   )r2   r,   r   rD   r   s        r   transform_iterable_datasetr      s     Jf$$_$!)`   !+
%	
*1	
(/	
 !*K<YZZZ	
 )0		
 	 	 	 	r   rS   )shardingr   num_batchesr   	frameworkconfigr   r   r   )rS   pytorchc                D   | j                             | j        | j                  }t	          j        d|            |j        &t          || j        j        | j	        |||||          S t          || j        | j        j        | j	        |||| j        | j        ||          S )a  Create a data loader for training.

    Args:
        config: The training configuration.
        sharding: The sharding to use for the data loader (JAX only).
        shuffle: Whether to shuffle the data.
        num_batches: Determines the number of batches to return.
        skip_norm_stats: Whether to skip data normalization.
        framework: The framework to use ("jax" or "pytorch").
    zdata_config: N)r   rc   r   r   r   r   r   )
rj   r   rc   r   r   r   num_workersseedr   r   )datacreateassets_dirsmodellogginginfor   create_rlds_data_loaderr   rc   create_torch_data_loaderr   r   )r   r   r   r   r   r   r,   s          r   create_data_loaderr      s    & +$$V%7FFKL.../// ,&!<6(#+	
 	
 	
 		
 $\|2$&['   r   )r   r   r   r   r   r   r   r   r   c                l   t          | ||          }t          || |          }d}|
dk    rt          j                                        rt          j        j        j                            |t          j                                        t          j        	                                |d          }|t          j                                        z  }n|}n|t          j                    z  }t          j        d|            t          |||
dk    rdn||du o|||||	|
	  	        }t          | |          S )aT  Create a data loader for training.

    Args:
        data_config: The data configuration.
        action_horizon: The action horizon.
        batch_size: The batch size.
        sharding: The sharding to use for the data loader. If None, the data loader will
            use a single device sharding.
        skip_norm_stats: Whether to skip data normalization.
        shuffle: Whether to shuffle the data.
        num_batches: Determines the number of batches to return. If the number exceeds the
            number of batches in the dataset, the data loader will loop over the dataset.
            If not provided, will iterate over the dataset indefinitely.
        num_workers: The number of worker processes to use. If zero, the data loader will
            execute in the main process.
        seed: The seed to use for shuffling the data.
    r   Nr   T)num_replicasrankr   	drop_lastzlocal_batch_size: )local_batch_sizer   r   samplerr   r   r   r   )r   r   torchdistributedis_initializedutilsr   DistributedSamplerget_world_sizeget_rankrS   process_countr   r   TorchDataLoaderDataLoaderImpl)r,   rj   r   rc   r   r   r   r   r   r   r   r2   r   r   data_loaders                  r   r   r     sP   > #;MMGoVVVG
 GI++-- 
	*k&2EE".==??&//11 F  G  *U->-M-M-O-OO)%):)<)<<L8&688999!)"i//XD,W
 
 
K +{333r   )r   r   r   r   r   c                    |dk    rt          d          t          | |||          }t          || |d          }t          |||          }	t	          | |	          S )a  Create an RLDS data loader for training.

    Note: This data loader requires some extra dependencies -- see examples/droid/README_train.md

    Args:
        data_config: The data configuration.
        action_horizon: The action horizon.
        batch_size: The batch size.
        sharding: The sharding to use for the data loader. If None, the data loader will
            use a single device sharding.
        skip_norm_stats: Whether to skip data normalization.
        shuffle: Whether to shuffle the data.
        num_batches: Determines the number of batches to return. If the number exceeds the
            number of batches in the dataset, the data loader will loop over the dataset.
            If not provided, will iterate over the dataset indefinitely.
    r   z-PyTorch RLDS data loader is not supported yetr   Tr   r   r   )r   r   r   RLDSDataLoaderr   )
r,   r   rc   r   r   r   r   r   r2   r   s
             r   r   r   U  s{    6 I!"QRRR!+~zSZ[[[G(+kopppG   K +{333r   c                       e Zd ZdZdddddddddedej        j        dz  d	ed
e	j
        j        j        dz  dedz  dededefdZede	j
        j        j        fd            Zd ZdS )r   z!Torch data loader implementation.NFr   rS   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   c                .   t          j                    dk    rt          d          t          |          |k     r#t	          d| dt          |           d          || _        |w|	dk    rqt           j                            t           j                            t          j	                    d          t           j        
                    d	                    | _        || _        d}
|d
k    rt          j        d          }
t          j                    }|                    |           t          j        j                            t)          j        t          j        j        j        |          ||du o||||
|d
k    t.          t0          d|          | _        dS )a  Create a PyTorch data loader.

        Args:
            dataset: The dataset to load.
            local_batch_size: The local batch size for each process.
            sharding: The sharding to use for the data loader.
            shuffle: Whether to shuffle the data.
            num_batches: If provided, determines the number of returned batches. If the
                number is larger than the number of batches in the dataset, the data loader
                will loop over the dataset. If not provided, will iterate over the dataset
                indefinitely.
            num_workers: The number of worker processes to use. If zero, the data loader will
                execute in the main process.
            seed: The seed to use for shuffling the data.
        rt   6Data loading with multiple processes is not supported.zLocal batch size (z#) is larger than the dataset size (z).NrS   Br   r   spawnT)
rc   r   r   r   multiprocessing_contextpersistent_workers
collate_fnworker_init_fnr   	generator)rS   r   r   r>   r   	_shardingr   NamedShardingMeshdevicesPartitionSpec_num_batchesmultiprocessingget_contextr   	Generatormanual_seedr   r   r*   typingcastr   _collate_fn_worker_init_fn_data_loader)r   r2   r   r   r   r   r   r   r   r   
mp_contextr   s               r   r:   zTorchDataLoader.__init__  sy   8 ""%&^___w<<***w2Bwwgjkrgsgswwwxxx "	U 2 2 \77!!#+--88**3// DN (
??(4W==JO%%	d###!K,77K(0'::'_0#$.*Q"* 8 
 
r   r   c                     | j         S r5   )r   r   s    r   torch_loaderzTorchDataLoader.torch_loader  s      r   c              #   b   K   d}	 t           j                  }	  j        | j        k    rd S 	 t          |          }n# t          $ r Y ncw xY w|dz  } j        &t          j                             fd|          V  n,t          j                            t          j
        |          V  )Nr   Trt   c                 8    t          j        j        |           S r5   rS   "make_array_from_process_local_datar   rO   r   s    r   rQ   z*TorchDataLoader.__iter__.<locals>.<lambda>  s    1WX\Xfhi1j1j r   )iterr   r   r`   StopIterationr   rS   rT   rU   r   	as_tensorr   	num_items	data_iterbatchs   `   r   r'   zTorchDataLoader.__iter__  s      		?T.//I?$0Y$BS5S5SF OOEE$   EQ	>-(,,'j'j'j'jlqrrrrrr(,,u>>>>>?	?   A 
AA)r   r   r    r!   r"   rS   r   Shardingrg   r   r   r   Samplerstrr:   propertyr*   r   r'   r#   r   r   r   r   ~  s        ++ 2637"&>
 >
 >
 >

 ,'$.>
 >
 !)D0>
 4Z>
 >
 >
 >
 >
 >
 >
@ !ek.9 ! ! ! X!? ? ? ? ?r   r   c                 2    t          j        j        d g| R  S )z5Collate the batch elements into batched numpy arrays.c                  B    t          j        d | D             d          S )Nc                 6    g | ]}t          j        |          S r#   )r^   asarray)rJ   rO   s     r   rW   z1_collate_fn.<locals>.<lambda>.<locals>.<listcomp>  s     -H-H-Hbjmm-H-H-Hr   r   r[   r]   )xss    r   rQ   z_collate_fn.<locals>.<lambda>  s$    BH-H-HR-H-H-Hq$Q$Q$Q r   rR   )itemss    r   r   r     s#     8<QQZTYZZZZr   	worker_idc                 B    dt           j        d<   dt           j        d<   dS )zETell JAX inside the worker process not to preallocate the GPU memory.falseXLA_PYTHON_CLIENT_PREALLOCATEplatformXLA_PYTHON_CLIENT_ALLOCATORN)osenviron)r  s    r   r   r     s$     3:BJ./0:BJ,---r   c                   T    e Zd ZdZddddedej        j        dz  dedz  fdZ	d Z
dS )	r   zShallow wrapper around the DROID data loader to make it compatible with openpi.

    All batching already happens in the DROID dataset, so we don't need to do anything here.
    Nr   r2   r   r   c                f   || _         || _        t          j                    dk    rt	          d          |lt          j                            t          j                            t          j                    d          t          j        	                    d                    }|| _
        || _        d S )Nrt   r   r   r   )r6   r   rS   r   r   r   r   r   r   r   r   )r   r2   r   r   s       r   r:   zRLDSDataLoader.__init__  s      '""%&^___|11!!#+--88**3// H
 "'r   c              #       K   d}	 t           j                  }	  j        | j        k    rd S 	 t          |          }n# t          $ r Y n/w xY w|dz  }t
          j                             fd|          V  `v)Nr   Trt   c                 8    t          j        j        |           S r5   r  r  s    r   rQ   z)RLDSDataLoader.__iter__.<locals>.<lambda>  s    S-STXTbde-f-f r   )r  r6   r   r`   r  rS   rT   rU   r	  s   `   r   r'   zRLDSDataLoader.__iter__  s      	
	oT]++Io$0Y$BS5S5SF OOEE$   EQ	hll#f#f#f#fhmnnnnno
	or  )r   r   r    r!   r	   rS   r   r  r"   r:   r'   r#   r   r   r   r     s          26"&( ( (!( ,'$.	(
 4Z( ( ( (.o o o o or   r   c                   J    e Zd Zdej        deez  fdZdej        fdZd Z	dS )r   r,   r   c                 "    || _         || _        d S r5   )_data_configr   )r   r,   r   s      r   r:   zDataLoaderImpl.__init__  s    ''r   r   c                     | j         S r5   )r&  r   s    r   r,   zDataLoaderImpl.data_config  r   r   c              #   p   K   | j         D ]+}t          j                            |          |d         fV  ,d S )Nr   )r   r   Observation	from_dict)r   r  s     r   r'   zDataLoaderImpl.__iter__  sP      & 	H 	HE$..u55uY7GGGGGG	H 	Hr   N)
r   r   r    r.   r/   r   r   r:   r,   r'   r#   r   r   r   r     sq        (G$6 (_WeEe ( ( ( (!W/ ! ! ! !H H H H Hr   r   )@collections.abcr   r   r   r   r  r   r   r   r   r   rS   	jax.numpynumpyrz   'lerobot.common.datasets.lerobot_datasetcommonr   r   r^   r   openpi.models.modelmodelsr   r   openpi.training.configtrainingr   r.   "openpi.training.droid_rlds_datasetr	   openpi.transformsr3   r7   r
   r   r%   r*   r1   rB   ri   r/   r"   r   r   rg   r   r   r   TrainConfigr   r  tupler)  Actionsr   r  r   r   r   r   r   r   r   r#   r   r   <module>r9     s   . . . . . . . .      				  < < < < < < < < < < < < 



       A A A A A A A A A A A A      $ $ $ $ $ $ $ $ $ ( ( ( ( ( ( ( ( ( ? ? ? ? ? ? ' ' ' ' ' 'wv&&&U U U U Uhtn U U UU U U U Uhtn U U UY Y Y Y Y$ Y Y Y	" 	" 	" 	" 	" 	" 	" 	"" " " " "!6 " " "D! ! ! ! !' ! ! !>#58HNH^   <   # 
     $ ej   w W5G ]a nu    4 "  # 	
     @ .2"!+0- - -- l#d*- 	-
 t- - '(- f(&.89:- - - -l .2!"B4 B4 B4#B4(B4 B4 	B4 l#d*B4 B4 B4 tB4 B4 B4 B4 f(&.89:B4 B4 B4 B4T .2!"&4 &4 &4#&4&4 &4
 l#d*&4 &4 &4 t&4 &4 f(&.89:&4 &4 &4 &4RW? W? W? W? W? W? W? W?t[ [ [;s ;t ; ; ; ;)o )o )o )o )o )o )o )oX
H 
H 
H 
H 
HZ 
H 
H 
H 
H 
Hr   