
    @(iU                       S r SSKJr  SSKrSSKrSSKrSSKrSSKrSSKJ	r	  SSK
JrJr  SSKJr  SSKJr  SSKJr  SS	KJr  SS
KJr  SSKrSSKrSSKrSSKJr  SSKJr  SSKJrJ r J!r!J"r"J#r#J$r$J%r%  \RL                  " \'5      r(\(       a0  SSK)J*r*J+r+J,r,  SS	KJr-  SSKJ.r.J/r/  SSK0J1r1  SSKJ2r2  SSK3J4r4  SSKJ5r5  Sr6Sr7\" SS9 " S S5      5       r8\" SSSS9 " S S5      5       r9      S'S jr:\ " S S 5      5       r;\;Rx                  4       S(S! jjr=\(       a  \/" S"5      r>    S)S# jr?S$ r@\@        S*S% j5       rA\@        S+S& j5       rBg),a  
Sharded repodata subsets.

Traverse dependencies of installed and to-be-installed packages to generate a
useful subset for the solver.

The algorithm developed here is a direct result of the following CEP:

- https://conda.org/learn/ceps/cep-0016 (Sharded Repodata)

In this algorithm we treat a (channel, package name) as a node, its dependencies
as edges. We then traverse all edges to discover all reachable (channel, package
name) tuples. The solver should be able to find a solution with only this
subset.

This subset is overgenerous since the user is unlikely to want to install very
old packages and their dependencies. If this is too slow, we could deploy
heuristics that automatically ignore older package versions. We could also allow
the user to configure minimum versions of common packages and ignore older
versions and their dependencies, falling back to a full solve if unsatisfiable.

We treat both sharded and monolithic repodata as if they were made up of
per-package shards, computing a subset of both. This is because it is possible
for the monolithic repodata to mention packages that exist in the true sharded
repodata but would not be found by only traversing the shards.

We treat all repodata as sharded, even if no actual sharded repodata has been
found.

## Example usage

The following constructs several repodata (`noarch` and `linux-64`) from a
single channel name and a list of root packages:

``` from conda.models.channel import Channel from
conda_libmamba_solver.shards_subset import build_repodata_subset

channel = Channel("conda-forge-sharded/linux-64") channel_data =
build_repodata_subset(["python", "pandas"], [channel.url()]) repodata = {}

for url in channel_data:
    repodata[url] = channel_data.build_repodata()

# ... this is what's fed to the solver ```

    )annotationsN)deque)FutureThreadPoolExecutor)suppress)	dataclass)Path)SimpleQueue)TYPE_CHECKING)shards_cache)AnnotatedRawShard   )ZSTD_MAX_SHARD_SIZEShards_shards_connectionsbatch_retrieve_from_cachebatch_retrieve_from_networkfetch_channelsshard_mentioned_packages)IterableIteratorSequence)LiteralTypeVar)Channel)
ShardCache)	ShardDict)	ShardBase   
   T)orderc                  z    \ rS rSr% \R
                  rS\S'   SrS\S'   Sr	S\S'   Sr
S	\S
'   SrS\S'   SS jrSrg)Nodeh   intdistance strpackagechannelFboolvisited	shard_urlc                X    [        U R                  U R                  U R                  5      $ N)NodeIdr)   r*   r-   selfs    [/data/cameron/miniconda/lib/python3.13/site-packages/conda_libmamba_solver/shards_subset.pyto_id
Node.to_idp   s    dllDLL$..AA     N)returnr0   )__name__
__module____qualname____firstlineno__sysmaxsizer&   __annotations__r)   r*   r,   r-   r4   __static_attributes__r7   r6   r3   r#   r#   h   s>    KKHcGSGSGTIsBr6   r#   )r!   eqfrozenc                  >    \ rS rSr% S\S'   S\S'   SrS\S'   S rSrg	)
r0   t   r(   r)   r*   r'   r-   c                Z    [        U R                  U R                  U R                  45      $ r/   )hashr)   r*   r-   r1   s    r3   __hash__NodeId.__hash__z   s     T\\4<<@AAr6   r7   N)r9   r:   r;   r<   r?   r-   rG   r@   r7   r6   r3   r0   r0   t   s    LLIsBr6   r0   c           
   #     #    U  HL  nU HC  nX#;   d  M
  [        SX#R                  UR                  U5      S9nUR                  5       nXT4v   ME     MN     g7f)zA
Yield (NodeId, Node) for all root packages found in shardlikes.
r   )r-   N)r#   urlr-   r4   )root_packages
shardlikesr)   	shardlikenodenode_ids         r3   _nodes_from_packagesrP   ~   sR      !#I#AwATATU\A]^**,m#	 $ !s   AAAc                      \ rS rSr% S\S'   S\S'   SrSS jr\SS j5       rSS	 jr	SS
 jr
S rS rS r      SS jr      SS jrSrg)RepodataSubset   zdict[NodeId, Node]nodeszSequence[ShardBase]rL   	pipelinedc                2    0 U l         [        U5      U l        g r/   )rT   listrL   )r2   rL   s     r3   __init__RepodataSubset.__init__   s    
z*r6   c                     [        U SU 35      $ )zH
Return True if this class provides the named shard traversal strategy.

reachable_)hasattr)clsstrategys     r3   has_strategyRepodataSubset.has_strategy   s    
 sj
344r6   c              #    #    [        5       nU R                   H  nUR                  U;  a  M  UR                  UR                  5      n[	        U5       H  n[        XSR                  5      nX`R                  ;  d  M)  [        UR                  S-   XSR                  5      U R                  U'   U R                  U   v   XR;  d  Mq  UR                  U5        M     M     g7f)zd
Retrieve all unvisited neighbors of a node

Neighbors in the context are dependencies of a package
r   N)setrL   r)   fetch_shardr   r0   rJ   rT   r#   r&   add)r2   rN   
discoveredrM   shardr)   rO   s          r3   	neighborsRepodataSubset.neighbors   s      U
I||9, ))E 4E: --8**,*.t}}q/@'==*YDJJw'**W--0"w/ ; )s   A7C=ACCc              #  H   #    U R                  U5       H	  nUS4v   M     g7f)z8
All nodes that can be reached by this node, plus cost.
r   N)rg   )r2   rN   ns      r3   outgoingRepodataSubset.outgoing   s#      %AQ$J &s    "c                   [        [        XR                  5      5      U l        [	        U R                  R                  5       5      nU R                   Vs/ s H  n[        U[        5      (       d  M  UPM     nnU(       a  U Vs1 s H!  oUR                  (       a  M  UR                  iM#     nnU(       a   [        U[        U5      5      n[        U5        [        U5      n[        U5       Hk  n	UR                  5       nUR                  (       a  M&  SUl        U R!                  U5       H)  u  pU
R                  (       a  M  UR#                  U
5        M+     Mm     U(       a  M  ggs  snf s  snf )z
Fetch all packages reachable from `root_packages`' by following
dependencies using the "breadth-first search" algorithm.

Update associated `self.shardlikes` to contain enough data to build a
repodata subset.
TN)dictrP   rL   rT   r   values
isinstancer   r,   r)   r   sortedr   lenrangepopleftrk   append)r2   rK   
node_queuesshardedrN   to_retrievenot_in_cache
level_size_	next_nodes              r3   reachable_bfsRepodataSubset.reachable_bfs   s
    .}ooNO
4::,,./
"ooGoAv1F1oG4>SJDll<4<<JKS8&BUV+L9 ZJ:&!))+<<#$(MM$$7LI$,,,")))4 %8 ' j H Ts   E&2E&E+E+c                   [         R                  " [        [        R                  R
                  R                  5       5      5      n[        5       n[        5       n[        5       n[        R                  " [        X4XR4SS9n[        R                  " [        XTX R                  4SS9n UR                  5         UR                  5         U R                  XXFU5        UR                  S5        UR!                  ["        5        UR!                  ["        5        g! UR                  S5        UR!                  ["        5        UR!                  ["        5        f = f)z
Fetch all packages reachable from `root_packages`' by following
dependencies.

Build repodata subset using concurrent threads to follow dependencies,
fetch from cache, and fetch from network.
T)targetargsdaemonN)r   r   r	   condagatewaysrepodatacreate_cache_dirr
   	threadingThreadcache_fetch_threadnetwork_fetch_threadrL   startpipelined_main_threadputjoinTHREAD_WAIT_TIMEOUT)r2   rK   cachecache_in_queueshard_out_queuecache_miss_queuecache_threadnetwork_threads           r3   reachable_pipelined"RepodataSubset.reachable_pipelined   s    ''U^^-D-D-U-U-W(XY;F=S^S`=H] ''% 3CK
 #))'"UOOL

	5   "&&n t$12 34 t$12 34s   '3D =Ec                   ^ ^^^^^ T R                    Vs0 s H  ofR                  U_M     snm[        5       m[        5       mSnSn0 T l        [	        S5      n	T R                  TX5        UUUU UU4S jn
SnU(       a  U
" 5          TR                  SS9nUc  SnM%  [        U[        5      (       a  Ue U Hl  u  pTR'                  U5        T R                  U   n	TUR(                     nUR+                  UR,                  U5        T R                  TU	[/        U5      5        Mn     T(       d6  T(       d/  U(       d(  [        R                  S5        TR1                  S5        SnU(       a  M  ggs  snf ! [        R                   Ga    U
" 5       n[        R                  SX}5        [        R                  S	[        S
 T 5       5      SS 5        [        R                  S[        S T 5       5      SS 5        [        R                  S[        T R                  5      5        [        R                  SUR                  5       5        [        R                  SUR                  5       5        [        R                  STR!                  5       5        T(       d  T(       d  [        R                  S5         gUS-  nU["        :  aH  [%        SU S[        T5       S[        T5       SUR                  5        SUR                  5        3
5      e GM  f = f)zG
Run reachibility algorithm given queues to submit and receive shards.
r   Fc                   > TR                  TT5      u  pU(       a"  TR                  U5        TR                  U5        U (       a)  TR                  S U  5       5        TR                  U 5        [        U 5      [        U5      -   $ )zw
Find shards we already have and those we need. Submit those need to
cache_in_queue, those we have to shard_out_queue.
c              3  *   #    U  H	  u  pUv   M     g 7fr/   r7   ).0rO   r|   s      r3   	<genexpr>ERepodataSubset.pipelined_main_thread.<locals>.pump.<locals>.<genexpr>*  s      @4ZW4s   )drain_pendingupdater   rr   )haveneedr   	in_flightpendingr2   r   shardlikes_by_urls     r3   pump2RepodataSubset.pipelined_main_thread.<locals>.pump   sq    
 ++G5FGJD  &""4(   @4 @@##D)t9s4y((r6   Tr   timeoutNzShard timeout %s, pump_count=%dzpending: %s...c              3  8   #    U  H  n[        U5      v   M     g 7fr/   r(   r   rO   s     r3   r   7RepodataSubset.pipelined_main_thread.<locals>.<genexpr>;  s     2WwG3w<<w   r    zin_flight: %s...c              3  8   #    U  H  n[        U5      v   M     g 7fr/   r   r   s     r3   r   r   <  s     4[QZgS\\QZr   z	nodes: %dzcache_thread.is_alive(): %sznetwork_thread.is_alive(): %szshard_out_queue.qsize(): %sz#All shards have finished processingz*Timeout waiting for shard_out_queue after z attempts. pending=z, in_flight=z, cache_thread_alive=z, network_thread_alive=z3Initiating shutdown: sending None to cache_in_queue)rL   rJ   rb   rT   r#   
visit_nodegetrp   	ExceptionqueueEmptylogdebugrq   rr   is_aliveqsize REACHABLE_PIPELINED_MAX_TIMEOUTSTimeoutErrorremover*   visit_shardr)   r   r   )r2   rK   r   r   r   r   rw   timeoutsshutdown_initiatedparent_noder   running
new_shards
pump_countrO   rf   rM   r   r   r   s   ` ``             @@@r3   r   $RepodataSubset.pipelined_main_thread  s    04?!UUAX?"u!$	"
 1g<	) 	) F,000;
%#Gj)44$$ 50 #-  ) #jj1-goo>	%%goou=6Nu6UV #- 95G		OP""4(%)"Y g7 @H ;; !V
		;XR		*F2Ww2W,WX[Y[,\]		,f4[QZ4[.[\_]_.`a		+s4::7		79N9N9PQ		9>;R;R;TU		79N9N9PQyIICDA>>&DXJ O##&w<.S^<L M..:.C.C.E-F G00>0G0G0I/JL  )s%   E.E3 E3 3D<L1ALLc           	     p   U H  nU R                    H  nXE;   d  M
  [        XER                  UR                  U5      5      nX`R                  ;  d  M@  [        UR                  S-   UR                  UR                  UR                  S9nXpR                  U'   UR                  U5        M     M     SUl
        g)z8Broadcast mentioned packages across channels to pending.r   )r&   r)   r*   r-   TN)rL   r0   rJ   r-   rT   r#   r&   r)   r*   rd   r,   )r2   r   r   mentioned_packagesr)   rM   new_node_idnew_nodes           r3   r   RepodataSubset.visit_node]  s     *G!__	'"(--ATATU\A]"^K"**4#'%0%9%9A%=$/$7$7$/$7$7&1&;&;	$ 3;

;/K0 - * #r6   c                   / n/ nU H  nX%R                      nUR                  UR                  5      (       a-  UR                  XVR	                  UR                  5      45        M^  U R
                  U   R                  (       a  [        R                  S5        M  UR                  U5        M     UR                  5         XC4$ )zr
Check pending for in-memory shards.
Clear pending.

Return a list of shards we have and shards we need to fetch.
z#Skip visited, should not be reached)
r*   shard_loadedr)   ru   visit_packagerT   r,   r   r   clear)r2   r   r   shards_needshards_haverO   rM   s          r3   r   RepodataSubset.drain_pendingt  s     G)//:I%%goo66""G-D-DW__-U#VW::g&..IICD""7+  	''r6   )rT   rL   N)rL   Iterable[ShardBase])r^   r(   r8   r+   )rN   r#   r8   zIterator[Node])rN   r#   )r   set[NodeId]r   r#   r   Iterable[str])r   r   r   dict[str, ShardBase]r8   z3tuple[list[tuple[NodeId, ShardDict]], list[NodeId]])r9   r:   r;   r<   r?   DEFAULT_STRATEGYrX   classmethodr_   rg   rk   r~   r   r   r   r   r@   r7   r6   r3   rR   rR      s    ##"+ 5 508
5@'5RM*^#"#15#KX#.("(7K(	<(r6   rR   c                2   [        U[        5      (       a  [        UR                  5       5      nOUn[	        U5      n[        / UR                  5       Q75      n[        USU 35      " U 5        [        R                  S[        UR                  5      5        U$ )z
Retrieve all necessary information to build a repodata subset.

Params:
    root_packages: iterable of installed and requested package names
    channels: iterable of Channel objects
    algorithm: desired traversal algorithm
r[   z&%d (channel, package) nodes discovered)rp   rn   rW   ro   r   rR   getattrr   r   rr   rT   )rK   channels	algorithm	channels_channel_datasubsets         r3   build_repodata_subsetr     s~     (D!!#'(9#:		!),L5l11356FFj,-m<II6FLL8IJr6   _Tc              #  j  #    SnU(       az   U R                  SS9nUc  g [        U5      n[	        [        R                  5          U R                  5       nUc  SnOUR                  U5        M)  SSS5        Uv   U(       a  My  gg! [        R                   a     M  f = f! , (       d  f       N8= f7f)zF
Combine lists from in_queue until we see None. Yield combined lists.
Tr   r   NF)r   r   r   rW   r   
get_nowaitextend)in_queuer   batchnode_idss       r3   combine_batches_until_noner     s      G
	LLL+E}  ;ekk" ++-=#GOOE*  # ) ' {{ 		
 #"sE   
B3B &B3+B"0B3B3BB3BB3"
B0,B3c                F   ^  [         R                  " T 5      U 4S j5       nU$ )zJ
Decorator to send unhandled exceptions to the second argument out_queue.
c                   >  T" X/UQ70 UD6$ ! [          a,  nU R                  S 5        UR                  U5         S nAg S nAff = fr/   )r   r   )r   	out_queuer   kwargsefuncs        r3   wrapper#exception_to_queue.<locals>.wrapper  sH    	=d=f== 	LLMM!	s    
A"A  A)	functoolswraps)r   r   s   ` r3   exception_to_queuer     s'    
 __T  Nr6   c                   UR                  5       n[        U 5       H  nUR                  U Vs/ s H  oUR                  PM     sn5      n/ n/ nU HJ  nUR	                  UR                  5      =n	(       a  UR                  XY45        M9  UR                  U5        ML     U(       a  UR                  U5        U(       d  M  UR                  U5        M     UR                  S5        UR                  S5        gs  snf )a  
Fetch batches of shards from cache until in_queue sees None. Enqueue found
shards to shard_out_queue, and not found shards to network_out_queue.

When we see None on in_queue, send None to both out queues and exit.

Args:
    in_queue: NodeId (URLs) to fetch.
    shard_out_queue: fetched shards sent to queue.
    network_out_queue: cache misses forwarded to queue. Same queue is
        network_fetch_thread's in_queue.
    cache: used to retrieve shards.
N)copyr   retrieve_multipler-   r   ru   r   )
r   r   network_out_queuer   r   rO   cachedfound	not_foundrf   s
             r3   r   r     s    ( JJLE.x8((8)T8*;*;8)TU 13"$	G

7#4#455u5g-.  )	   !!),5&! 9$ $% *Us   C7
c                  ^ ^^^^^^ TR                  5       m[        R                  " [        S9mU Vs0 s H  oDR                  U_M     snmSS jmS	UUU4S jjnS
UUU4S jjnS
U 4S jjn[        [        5       S9 m[        T 5       HD  nU H;  n	[        U	[        5      (       a
  U" U	5        M"  U" U	5      n
U
R                  U5        M=     MF     SSS5        gs  snf ! , (       d  f       g= f)ar  
Fetch shards from the network that are received on in_queue, until we see
None.

Unhandled exceptions also go to shard_out_queue, and exit this thread.

Args:
    in_queue: NodeId (URLs) to fetch.
    shard_out_queue: fetched shards sent to queue.
    cache: once shards are decoded they are stored in cache.
    shardlikes: list of (network-only) shard index objects.
)max_window_sizec                d    U R                  U5      nUR                  5         UR                  nXU4$ r/   )r   raise_for_statuscontent)rw   rJ   rO   responsedatas        r3   fetch#network_fetch_thread.<locals>.fetch  s1    55:!!#T!!r6   c                   > TU R                      n[        U[        5      (       d  [        S5      eUR                  nTU R                      R                  U R                  5      nTR                  TX#U 5      $ )Nz.network_fetch_thread got non-network shardlike)r*   rp   r   	TypeErrorsessionr-   r)   submit)rO   rM   r  rJ   executorr   r   s       r3   r  $network_fetch_thread.<locals>.submit$  se    %goo6	)V,,LMM##0::7??KugG<<r6   c                ,  > U R                  5       u  pn[        R                  SU[        U5      5        [        R
                  " TR                  U[        S95      nTR                  [        XR                  U5      5        TR                  X$4/5        g )NzFetch thread got %s (%s bytes))max_output_size)resultr   r   rr   msgpackloads
decompressr   insertr   r)   r   )futurerJ   rO   r   rf   r   dctxr   s        r3   handle_result+network_fetch_thread.<locals>.handle_result-  sw    #]]_d		2CTC #==OOD2EOF

 	&sOOTBCg-./r6   c                *   > TR                  U /5        g r/   )r   )r  r   s    r3   result_to_in_queue0network_fetch_thread.<locals>.result_to_in_queue;  s     	fXr6   )max_workersN)rJ   r(   rO   r0   )rO   r0   )r  r   )r   	zstandardZstdDecompressorr   rJ   r   r   r   rp   r   add_done_callback)r   r   r   rL   rw   r  r  r  node_ids_and_resultsnode_id_or_resultr  r  r  r   r   s   ```        @@@@r3   r   r     s    & JJLE%%6IJD+56:a:6"= =0 0 
(;(=	>($>x$H %9!/88!"34#$56F,,-?@ &: %I 
?	>K 7J 
?	>s   C?AC!!
C/)rK   z	list[str]rL   r   r8   zIterator[tuple[NodeId, Node]])rK   r   r   zdict[str, Channel]r   zLiteral['bfs', 'pipelined']r8   r   )r   zQueue[Sequence[_T] | None]r8   zIterator[Sequence[_T]])r   Queue[Sequence[NodeId] | None]r   z<Queue[Sequence[tuple[NodeId, ShardDict] | Exception] | None]r   r  r   r   )r   z'Queue[Sequence[NodeId | Future] | None]r   z8Queue[list[tuple[NodeId, ShardDict] | Exception] | None]r   r   rL   zlist[ShardBase])C__doc__
__future__r   r   loggingr   r=   r   collectionsr   concurrent.futuresr   r   
contextlibr   dataclassesr   pathlibr	   r
   typingr   conda.gateways.repodatar   r
  r  conda_libmamba_solverr   "conda_libmamba_solver.shards_cacher   shardsr   r   r   r   r   r   r   	getLoggerr9   r   collections.abcr   r   r   Queuer   r   conda.models.channelr   r   #conda_libmamba_solver.shards_typingr   r   r   r   r#   r0   rP   rR   r   r   r   r   r   r   r   r7   r6   r3   <module>r.     s  -^ #    
   9  !        . @   !<<*',==
  #%   B B B $t,B B -B$$*=$"$ }( }( }(F .<-L-L   + 	8 	B(<  (,(Q( 6( 	( (V @A5@AM@A @A  	@A @Ar6   