o
    vi+                     @  sb   d dl mZ d dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
 d dlmZ G dd dZdS )    )annotationsN)Iterator)AnyOptionalUnion)logc                   @  s   e Zd ZdZ				 d)d*ddZd+ddZd+ddZejd,d-ddZ	d.ddZ
d/d0dd Zd+d!d"Zd1d#d$Zd+d%d&Zd+d'd(ZdS )2OperationWatchdoga8  A watchdog that monitors operations for hangs and collects performance statistics.

    This class provides a mechanism to detect when operations take longer than expected,
    which can help identify potential deadlocks or performance issues. It also collects
    statistics about operations to help identify bottlenecks.

    Attributes:
        warning_threshold: Time in seconds before warning about potential hangs.
        check_interval: Time in seconds between checks for hung operations.
        verbose_interval: Time in seconds between verbose logging.
    X     warning_thresholdintcheck_intervalverbose_intervalnamestrreturnNonec                 C  sJ   || _ || _|| _|| _i | _i | _t | _t	 | _
d| _|   dS )aa  Initialize the watchdog.

        Args:
            warning_threshold: Time in seconds before warning about potential hangs.
                Defaults to 600 (10 minutes).
            check_interval: Time in seconds between checks. Defaults to 30.
            verbose_interval: Time in seconds between verbose logging. Defaults to -1 (disabled).
        N)_warning_threshold_check_interval_verbose_interval_name_ops_stats	threadingLock_lockEvent_stop_event_threadstart)selfr   r   r   r    r"   S/data/cameron/vidgen/cosmos-policy/cosmos_policy/_src/predict2/datasets/watchdog.py__init__(   s   

zOperationWatchdog.__init__c                 C  s`   | j du s
| j  s.| j  tj| jd| j dd| _ | j   t	
d| j d dS dS )zsStart the watchdog monitoring thread.

        If the thread is already running, this method does nothing.
        NTZ_monitor_thread)targetdaemonr   [z$] Watchdog monitoring thread started)r   is_aliver   clearr   Thread_monitor_loopr   r    r   debugr!   r"   r"   r#   r    D   s   

zOperationWatchdog.startc                 C  sr   | j   | jr5| j r7| jjdd | j r'tjd| j ddd dS tjd| j ddd dS dS dS )	zzStop the watchdog monitoring thread.

        This method is typically called when shutting down the application.
        g      ?)timeoutr'   z2] Watchdog thread did not terminate within timeoutF
rank0_onlyz$] Watchdog monitoring thread stoppedN)	r   setr   r(   joinr   warningr   r,   r-   r"   r"   r#   stopO   s   

zOperationWatchdog.stop operation_namedescriptionverbose_first_nIterator[None]c           	      c  s8   | dt t d  }t }| j ||p|||dd| j|< W d   n1 s-w   Y  zdV  W t | }|| jkrXtjd| j d| d| d	|d
d	dd | jB || jv re| j|= || jvrtddddd| j|< | j| }|d  d7  < |d  |7  < t	|d ||d< ||d< W d   n1 sw   Y  |dkr|d |kr|d |d  }tj
d| j d| d|d  d| d|d
d|d
d|d d
d|d d
ddd dS dS dS t | }|| jkr
tjd| j d| d| d	|d
d	dd | jD || jv r| j|= || jvr(ddddd| j|< | j| }|d  d7  < |d  |7  < t	|d ||d< ||d< W d   n	1 sUw   Y  |dkr|d |kr|d |d  }tj
d| j d| d|d  d| d|d
d|d
d|d d
d|d d
ddd w w w )a}  Context manager for monitoring an operation.

        This is the primary interface for using the watchdog. It automatically
        tracks the start and end time of the operation and updates statistics.

        Args:
            operation_name: Name/type of the operation to monitor.
            description: Optional description providing more context.
            verbose_first_n: If positive, print verbose logs for first N operations for operation_name.

        Yields:
            None

        Example:
            with watchdog.watch("data_fetch", "Fetching user data"):
                data = fetch_user_data(user_id)
        _i  F)r   descr    updatewarnedNr'   z] Operation id: z	, name: 'z' took .2fsr/   r   g        )count
total_timemax_time	last_timer@      rA   rB   rC   z	] name: 'z	', count z / z took zs. avg zs, max zs, last )r   timer   r   r   r   r3   r   r   maxinfo)	r!   r6   r7   r8   op_id
start_timedurationstatsavg_timer"   r"   r#   watch\   sz   	
 



J
 

JzOperationWatchdog.watchc                 C  sd   t   }| j! | j D ]\}}|d |kr||d< d|d< qW d   dS 1 s+w   Y  dS )zSend a heartbeat for all operations of a given type.

        Use this inside long operations to prevent false warnings.

        Args:
            operation_name: The operation type to update.
        r   r<   Fr=   N)rE   r   r   items)r!   r6   current_timeZ_op_idopr"   r"   r#   	heartbeat   s   "zOperationWatchdog.heartbeatNOptional[str]dict[str, Any]c                 C  s   | j ^ |r4|| jv r+| j|  }|d dkr"|d |d  |d< |W  d   S i W  d   S i }| j D ]\}}| ||< |d dkrW|d |d  || d< q;|W  d   S 1 sdw   Y  dS )a  Get statistics for operations.

        Args:
            operation_name: Get stats for specific operation, or None for all.

        Returns:
            Dictionary of operation statistics. For each operation, includes:
            - count: Number of completed operations
            - total_time: Total time spent in this operation type
            - max_time: Maximum time spent in a single operation
            - last_time: Time spent in the most recent operation
            - avg_time: Average time per operation (if count > 0)
        r@   r   rA   rL   N)r   r   copyrN   )r!   r6   rK   resultr   r"   r"   r#   	get_stats   s$   

$zOperationWatchdog.get_statsc                 C  s   |   }|stjd| j ddd dS tjd| j ddd | D ]6\}}|d dkrZ|d	 |d  }tjd| j d
| d|d  d|dd|d dd|d dddd q$dS )zwPrint statistics for all operations.

        This is a convenience method that logs statistics at INFO level.
        r'   z#] No operation statistics availableFr/   Nz] Operation Statistics:r@   r   rA   z]  z: count=z, avg=r>   zs, max=rB   zs, last=rC   r?   )rV   r   rG   r   rN   )r!   rK   r   r?   avgr"   r"   r#   print_stats   s(   zOperationWatchdog.print_statsc                 C  sx   | j / t }i }| j D ]\}}|d |d ||d  ||d  d||< q|W  d   S 1 s5w   Y  dS )zList all currently active operations.

        Returns:
            Dictionary mapping operation IDs to information about active operations.
        r   r;   r    r<   )r   r7   running_timeZtime_since_updateN)r   rE   r   rN   )r!   rO   rU   rH   rP   r"   r"   r#   list_active_operations   s   

$z(OperationWatchdog.list_active_operationsc                 C  s6   | j  | j  W d   dS 1 sw   Y  dS )zReset all operation statistics.

        This clears all accumulated statistics but does not affect active operations.
        N)r   r   r)   r-   r"   r"   r#   reset_stats   s   "zOperationWatchdog.reset_statsc                 C  s  d}| j  st }| jK t| j D ];\}}||d  }|| jkrQ|d sQ||d  }tj	d| j
 d|d  d|d	  d
|dd|dddd d|d< qW d   n1 s\w   Y  | jdkrs|| | jkrs|   |}| j | j | j  rdS dS )zuMonitor registered operations for hangs.

        This is an internal method that runs in a separate thread.
        r   r<   r=   r    r'   z] POTENTIAL HANG: 'r   z' (r;   z) has been running for z.1fzs total, with zs since last updateFr/   TN)r   is_setrE   r   listr   rN   r   r   r3   r   r   rX   waitr   )r!   Zlast_verbose_timenowr:   rP   elapsedrA   r"   r"   r#   r+      s2   
zOperationWatchdog._monitor_loop)r	   r
   r   r   )
r   r   r   r   r   r   r   r   r   r   )r   r   )r5   r   )r6   r   r7   r   r8   r   r   r9   )r6   r   r   r   )N)r6   rR   r   rS   )r   rS   )__name__
__module____qualname____doc__r$   r    r4   
contextlibcontextmanagerrM   rQ   rV   rX   rZ   r[   r+   r"   r"   r"   r#   r      s"    


B


r   )
__future__r   re   r   rE   collections.abcr   typingr   r   r   #cosmos_policy._src.imaginaire.utilsr   r   r"   r"   r"   r#   <module>   s   