o
    ?߱i$                     @   sB  d dl mZmZ d dlZd dlmZ dejdejfddZdejdejfdd	Zd
ejdejdejdejdejdejdejfddZ	d
ejdejdejdejdeejejf f
ddZ
d
ejdejdejdejdeejejf f
ddZd
ejdejdejdedeejejf f
ddZd
ejdejdejdedeejejf f
ddZd
ejdejdejdedeejejf f
ddZd
ejdejdejdedeejejf f
ddZd
ejdejdejdedeejejf f
dd Zd
ejdejdejdedeejejf f
d!d"Zeeeeeed#Zd$edefd%d&Zd$edefd'd(ZdS ))    )CallableTupleN	batch_multreturnc                 C   s*   | j }| jtjd} t| |  j|dS )z
    Compute the first order phi function: (exp(t) - 1) / t.

    Args:
        t: Input tensor.

    Returns:
        Tensor: Result of phi1 function.
    dtype)r	   totorchfloat64expm1r   input_dtype r   `/data/cameron/vidgen/cosmos-predict2.5/cosmos_predict2/_src/imaginaire/functional/runge_kutta.pyphi1   s   
r   c                 C   s,   | j }| jtjd} t| d |  j|dS )z
    Compute the second order phi function: (phi1(t) - 1) / t.

    Args:
        t: Input tensor.

    Returns:
        Tensor: Result of phi2 function.
    r         ?)r	   r
   r   r   r   r   r   r   r   phi2&   s   
r   x_ssx0_ss1x0_s1c                 C   s   t | }t | }t | }|| }t t j|t |ddr(J dt t j|| t |ddr<J d|| | }t| t| }	}
t j|	d| |
  dd}t jd| |
 dd}tt 	| | t|t||t||  S )a  
    Perform a residual-based 2nd order Runge-Kutta step.

    Args:
        x_s: Current state tensor.
        t: Target time tensor.
        s: Current time tensor.
        x0_s: Prediction at current time.
        s1: Intermediate time tensor.
        x0_s1: Prediction at intermediate time.

    Returns:
        Tensor: Updated state tensor.

    Raises:
        AssertionError: If step size is too small.
    gư>)atolzStep size is too smallr   g        )nan)
r   loganyisclose
zeros_liker   r   
nan_to_numr   exp)r   r   r   r   r   r   mdtc2Zphi1_valZphi2_valb1b2r   r   r   res_x0_rk2_step5   s   $(,r'   c                 C   s,   || | }|| }t ||t ||  |fS )a3  
    Perform a regularized Euler step based on x0 prediction.

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_s: Prediction at current time.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and current prediction.
    r   )r   r   r   r   Zcoef_x0Zcoef_xsr   r   r   reg_x0_euler_step`   s   r(   eps_sc                 C   s$   | t |||  | t |d|  fS )aD  
    Perform a regularized Euler step based on epsilon prediction.

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        eps_s: Epsilon prediction at current time.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and current x0 prediction.
    r   r   )r   r   r   r)   r   r   r   reg_eps_euler_stepw   s   $r*   x0_fnc                 C   s   || |}t | |||S )a  
    Perform a first-order Runge-Kutta (Euler) step.

    Recommended for diffusion models with guidance or model undertrained
    Usually more stable at the cost of a bit slower convergence.

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and x0 prediction.
    )r(   )r   r   r   r+   r   r   r   r   	rk1_euler   s   
r,   c                 C   s8   t || }t| |||\}}|||}t| |||S )a8  
    Perform a stable second-order Runge-Kutta (midpoint) step.

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and x0 prediction.
    )r   sqrtr,   r(   )r   r   r   r+   r   x_s1_r   r   r   r   rk2_mid_stable   s   
r0   c                 C   s@   t || }t| |||\}}|||}t| ||||||fS )a1  
    Perform a second-order Runge-Kutta (midpoint) step.

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and x0 prediction.
    )r   r-   r,   r'   )r   r   r   r+   r   r.   r   r   r   r   r   rk2_mid   s   
r1   c           
      C   sZ   t | |||\}}td| || }|||}td| || }|| d }	t| |||	S )a  
    Perform a naive second-order Runge-Kutta (Heun's method) step.
    Impl based on rho-rk-deis solvers, https://github.com/qsh-zh/deis
    Recommended for diffusion models without guidance and relative large NFE

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and current state.
    r      r,   r   r*   )
r   r   r   r+   x_tr   r)   x0_tZeps_tavg_epsr   r   r   rk_2heun_naive   s   
r7   c                 C   s6   t | |||\}}|||}|| d }t| |||S )ak  
    Perform a naive second-order Runge-Kutta (Heun's method) step.
    Impl based no EDM second order Heun method

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and current state.
    r2   )r,   r(   )r   r   r   r+   r4   r   r5   Zavg_x0r   r   r   rk_2heun_edm   s   
r8   c                 C   s   d\}}d\}}d\}}	}
|| }|| | }|| | }t | |||\}}td| | | }|||}td| || }|| ||  }t| |||\}}|||}td| || }|| |	|  |
|  }t| |||S )a  
    Perform a naive third-order Runge-Kutta step.
    Impl based on rho-rk-deis solvers, https://github.com/qsh-zh/deis
    Recommended for diffusion models without guidance and relative large NFE

    Args:
        x_s: Current state tensor.
        s: Current time tensor.
        t: Target time tensor.
        x0_fn: Function to compute x0 prediction.

    Returns:
        Tuple[Tensor, Tensor]: Updated state tensor and current state.
    )g      ?r   )g      g       @)UUUUUU?gUUUUUU?r9   r   r3   )r   r   r   r+   r$   c3Za31Za32r%   r&   b3deltar   s2r.   r   r)   r   Zeps_s1_epsZx_s2r/   Zx0_s2Zeps_s2r6   r   r   r   rk_3kutta_naive   s    


r?   )1euler2midZ2mid_stableZ	2heun_edmZ2heun_naiveZ3kutta_naivenamec                 C   s,   | t v rt |  S dt  }td| )z
    Get the specified Runge-Kutta function.

    Args:
        name: Name of the Runge-Kutta method.

    Returns:
        Callable: The specified Runge-Kutta function.

    Raises:
        RuntimeError: If the specified method is not supported.
    z
	z1Only support the following Runge-Kutta methods:
	)RK_FNsjoinkeysRuntimeError)rB   methodsr   r   r   get_runge_kutta_fn0  s   rH   c                 C   s   | t v S )z
    Check if the specified Runge-Kutta function is supported.

    Args:
        name: Name of the Runge-Kutta method.

    Returns:
        bool: True if the method is supported, False otherwise.
    )rC   )rB   r   r   r   is_runge_kutta_fn_supportedC  s   
rI   )typingr   r   r   Z4cosmos_predict2._src.imaginaire.functional.batch_opsr   Tensorr   r   r'   r(   r*   r,   r0   r1   r7   r8   r?   rC   strrH   boolrI   r   r   r   r   <module>   s   
+



0


*
