"""Helper functions for converting streams to inline html."""

from __future__ import annotations

import html
import os
from pathlib import Path
from typing import TYPE_CHECKING, Literal

import pyarrow
from pyarrow import RecordBatch, ipc

from .time import to_nanos, to_nanos_since_epoch

if TYPE_CHECKING:
    from collections.abc import Callable
    from datetime import datetime, timedelta

    import datafusion
    import numpy as np

    from .blueprint import BlueprintLike


from rerun import bindings
from rerun.error_utils import RerunMissingDependencyError

from ._event import (
    ContainerSelectionItem as ContainerSelectionItem,
    EntitySelectionItem as EntitySelectionItem,
    PauseEvent as PauseEvent,
    PlayEvent as PlayEvent,
    RecordingOpenEvent as RecordingOpenEvent,
    SelectionChangeEvent as SelectionChangeEvent,
    SelectionItem as SelectionItem,
    TimelineChangeEvent as TimelineChangeEvent,
    TimeUpdateEvent as TimeUpdateEvent,
    ViewerEvent as ViewerEvent,
    ViewSelectionItem as ViewSelectionItem,
    _viewer_event_from_json_str,
)
from .recording_stream import RecordingStream, get_data_recording

__all__ = [
    "ContainerSelectionItem",
    "EntitySelectionItem",
    "PauseEvent",
    "PlayEvent",
    "RecordingOpenEvent",
    "SelectionChangeEvent",
    "SelectionItem",
    "TimeUpdateEvent",
    "TimelineChangeEvent",
    "ViewSelectionItem",
    "Viewer",
    "ViewerEvent",
    "set_default_size",
]

HAS_NOTEBOOK = True
try:
    from ipywidgets import HTML as _HTML, VBox as _VBox
    from rerun_notebook import ErrorWidget as _ErrorWidget, Viewer as _Viewer
except ModuleNotFoundError:
    HAS_NOTEBOOK = False


def _flush_ui_events() -> None:
    """Pump the Jupyter event loop so pending widget updates are delivered to the frontend."""
    import jupyter_ui_poll

    with jupyter_ui_poll.ui_events() as poll:
        poll(1)


# CSS for the loading indicator is co-located in `_loading_widget.css` and embedded
# inline so the stylesheet travels with each widget value update.
_LOADING_CSS = (Path(__file__).parent / "_loading_widget.css").read_text()


def _render_loading_html(table_id: str) -> str:
    """Build the HTML fragment shown in the loading widget for the given table id."""
    return (
        f"<style>{_LOADING_CSS}</style>"
        '<div class="rerun-notebook-loading">'
        '<div class="rerun-notebook-loading__spinner"></div>'
        f"<div>Loading table &ldquo;{html.escape(table_id)}&rdquo;&hellip;</div>"
        "</div>"
    )


_default_width = 640
_default_height = 480

_Panel = Literal["top", "blueprint", "selection", "time"]
_PanelState = Literal["expanded", "collapsed", "hidden"]


def set_default_size(*, width: int | None, height: int | None) -> None:
    """
    Set the default size for the viewer.

    This will be used for any viewers created after this call.

    Parameters
    ----------
    width:
        The width of the viewer in pixels.
    height:
        The height of the viewer in pixels.

    """

    global _default_width, _default_height
    if width is not None:
        _default_width = width
    if height is not None:
        _default_height = height


class Viewer:
    """
    A viewer embeddable in a notebook.

    This viewer is a wrapper around the `rerun_notebook.Viewer` widget.
    """

    def __init__(
        self,
        *,
        width: int | Literal["auto"] | None = None,
        height: int | Literal["auto"] | None = None,
        url: str | None = None,
        blueprint: BlueprintLike | None = None,
        recording: RecordingStream | None = None,
        use_global_recording: bool | None = None,
        theme: Literal["dark", "light", "system"] | None = None,
    ) -> None:
        """
        Create a new Rerun viewer widget for use in a notebook.

        Any data logged to the recording after initialization will be sent directly to the viewer.

        This widget can be displayed by returning it at the end of your cells execution, or immediately
        by calling [`rerun.notebook.Viewer.display`][].

        Parameters
        ----------
        width:
            The width of the viewer in pixels, or "auto".

            When set to "auto", scales to 100% of the notebook cell's width.
        height:
            The height of the viewer in pixels, or "auto".

            When set to "auto", scales using a 16:9 aspect ratio with `width`.
        url:
            Optional URL passed to the viewer for displaying its contents.
        recording:
            Specifies the [`rerun.RecordingStream`][] to use.
            If left unspecified, defaults to the current active data recording, if there is one.
            See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
        blueprint:
            A blueprint object to send to the viewer.
            It will be made active and set as the default blueprint in the recording.

            Setting this is equivalent to calling [`rerun.send_blueprint`][] before initializing the viewer.
        use_global_recording:
            If no explicit `recording` is provided, the Viewer uses the thread-local/global recording created by `rr.init`
            or set explicitly via `rr.set_thread_local_data_recording`/`rr.set_global_data_recording`.

            Settings this to `False` causes the Viewer to not pick up the global recording.

            Defaults to `False` if `url` is provided, and `True` otherwise.
        theme:
            The color theme to use. Either "dark", "light", or "system".

            If not set, the viewer uses the previously persisted theme preference or defaults to "system".

        """
        if not HAS_NOTEBOOK:
            raise RerunMissingDependencyError("rerun-notebook", "notebook")
        self._error_widget = _ErrorWidget()
        self._loading_widget = _HTML(value="")

        # Get access token from env variable
        credentials = None
        fallback_token = os.environ.get("REDAP_TOKEN", None)
        credentials = None
        if fallback_token is None:
            # Get credentials from the SDK and pass the access token to wasm viewer
            credentials = bindings.get_credentials()
            if credentials is not None:
                fallback_token = credentials.access_token

        self._viewer = _Viewer(
            width=width if width is not None else _default_width,
            height=height if height is not None else _default_height,
            url=url,
            fallback_token=fallback_token,
            theme=theme,
        )

        # Set full credentials if we have them so the UI can show who's logged in
        if credentials is not None:
            self._viewer.set_credentials(credentials.access_token, credentials.user_email)

        # Viewer event handling
        self._event_callbacks: list[Callable[[ViewerEvent], None]] = []

        def on_raw_event(json_str: str) -> None:
            evt = _viewer_event_from_json_str(json_str)
            for callback in self._event_callbacks:
                callback(evt)

        self._viewer._on_raw_event(on_raw_event)

        # By default, we use the global recording only if no `url` is provided.
        if use_global_recording is None:
            use_global_recording = url is None

        if use_global_recording:
            recording = get_data_recording(recording)

        if recording is not None:
            bindings.set_callback_sink(
                recording=recording.to_native(),
                callback=self._flush_hook,
            )

        if blueprint is not None:
            if recording is not None:
                recording.send_blueprint(blueprint)
            else:
                raise ValueError(
                    "Can only set a blueprint if there's either an active recording or a recording passed in"
                )

    def add_recording(
        self,
        recording: RecordingStream | None = None,
        blueprint: BlueprintLike | None = None,
    ) -> None:
        """
        Adds a recording to the viewer.

        If no recording is specified, the current active recording will be used.

        NOTE: By default all calls to `rr.init()` will re-use the same recording_id, meaning
        that your recordings will be merged together. If you want to keep them separate, you
        should call `rr.init("my_app_id", recording_id=uuid.uuid4())`.

        Parameters
        ----------
        recording:
            Specifies the [`rerun.RecordingStream`][] to use.
            If left unspecified, defaults to the current active data recording, if there is one.
            See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
        blueprint:
            A blueprint object to send to the viewer.
            It will be made active and set as the default blueprint in the recording.

            Setting this is equivalent to calling [`rerun.send_blueprint`][] before initializing the viewer.

        """
        recording = get_data_recording(recording)
        if recording is None:
            raise ValueError("No recording specified and no active recording found")

        bindings.set_callback_sink(
            recording=recording.to_native(),
            callback=self._flush_hook,
        )

        if blueprint is not None:
            recording.send_blueprint(blueprint)

    def _add_table_id(self, record_batch: RecordBatch, table_id: str) -> RecordBatch:
        # Get current schema
        schema = record_batch.schema
        schema = schema.with_metadata({b"__table_id": table_id})

        # Create new record batch with updated schema
        return RecordBatch.from_arrays(record_batch.columns, schema=schema)

    def set_application_blueprint(
        self, application_id: str, blueprint: BlueprintLike, *, make_active: bool = True, make_default: bool = True
    ) -> None:
        """
        Set the blueprint for the given application.

        Parameters
        ----------
        application_id:
            The ID of the application to set the blueprint for.
        blueprint:
            The blueprint to set for the application.
        make_active:
            Whether to make the blueprint active.
            If `True`, the blueprint will be set as the active blueprint for the application.
        make_default:
            Whether to make the blueprint the default blueprint for the application.
            If `True`, the blueprint will be set as the default blueprint for the application.

        """

        blueprint = blueprint.to_blueprint()

        blueprint_stream = RecordingStream._from_native(
            bindings.new_blueprint(
                application_id=application_id,
                make_default=False,
                make_thread_default=False,
                default_enabled=True,
            ),
        )

        blueprint_stream.set_time("blueprint", sequence=0)
        blueprint._log_to_stream(blueprint_stream)

        bindings.set_callback_sink_blueprint(
            callback=self._flush_hook,
            make_active=make_active,
            make_default=make_default,
            blueprint_stream=blueprint_stream.to_native(),
        )

    def send_table(
        self,
        id: str,
        table: RecordBatch | list[RecordBatch] | datafusion.DataFrame,
    ) -> None:
        """
        Sends a table in the form of a dataframe to the viewer.

        Parameters
        ----------
        id:
            The name that uniquely identifies the table in the viewer.
            This name will also be shown in the recording panel.
        table:
            The table data as an Arrow RecordBatch, list of RecordBatches, or a datafusion DataFrame.

        """
        from rerun._arrow import to_record_batch

        self._loading_widget.value = _render_loading_html(id)
        # Pump the UI event loop so the value change reaches the frontend before
        # to_record_batch() blocks Python (e.g. a slow datafusion collect).
        _flush_ui_events()

        try:
            record_batch = to_record_batch(table)
            new_table = self._add_table_id(record_batch, id)
            sink = pyarrow.BufferOutputStream()
            writer = ipc.new_stream(sink, new_table.schema)
            writer.write_batch(new_table)
            writer.close()
            table_as_bytes = sink.getvalue().to_pybytes()
            self._viewer.send_table(table_as_bytes)
        finally:
            self._loading_widget.value = ""

    def display(self, block_until_ready: bool = False) -> None:
        """
        Display the viewer in the notebook cell immediately.

        Parameters
        ----------
        block_until_ready:
            Whether to block until the viewer is ready to receive data. If this is `False`, the viewer
            will still be displayed, but logged data will likely be queued until the viewer becomes ready
            at the end of cell execution.

        """

        from IPython.display import display

        display(self._error_widget)
        # Wrap loading + viewer in a VBox so the spinner's rendered height and
        # positioning are tied to the viewer's own layout box instead of occupying
        # a separate notebook output cell.
        display(_VBox([self._loading_widget, self._viewer]))

        if block_until_ready:
            self._viewer.block_until_ready()

    def _ipython_display_(self) -> None:
        self.display()

    def _flush_hook(self, data: bytes) -> None:
        self._viewer.send_rrd(data)

    def update_panels(
        self,
        *,
        top: _PanelState | Literal["default"] | None = None,
        blueprint: _PanelState | Literal["default"] | None = None,
        selection: _PanelState | Literal["default"] | None = None,
        time: _PanelState | Literal["default"] | None = None,
    ) -> None:
        """
        Partially update the state of panels in the viewer.

        Valid states are the strings `expanded`, `collapsed`, `hidden`, `default`, and the value `None`.

        Panels set to:
        - `None` will be unchanged.
        - `expanded` will be fully expanded, taking up the most space.
        - `collapsed` will be smaller and simpler, omitting some information.
        - `hidden` will be completely invisible, taking up no space.
        - `default` will be reset to the default state.

        The `collapsed` state is the same as the `hidden` state for panels
        which do not support the `collapsed` state.

        Setting the panel state using this function will also prevent the user
        from modifying that panel's state in the viewer.

        Parameters
        ----------
        top: str
            State of the panel, positioned on the top of the viewer.
        blueprint: str
            State of the blueprint panel, positioned on the left side of the viewer.
        selection: str
            State of the selection panel, positioned on the right side of the viewer.
        time: str
            State of the time panel, positioned on the bottom side of the viewer.

        """

        panel_states: dict[_Panel, _PanelState | Literal["default"]] = {}
        if top:
            panel_states["top"] = top
        if blueprint:
            panel_states["blueprint"] = blueprint
        if selection:
            panel_states["selection"] = selection
        if time:
            panel_states["time"] = time

        self._viewer.update_panel_states(panel_states)

    def set_active_recording(
        self,
        *,
        recording_id: str,
    ) -> None:
        """
        Set the active recording for the viewer.

        This is equivalent to clicking on the given recording in the blueprint panel.

        Parameters
        ----------
        recording_id: str
            The ID of the recording to set the viewer to.

            Using this requires setting an explicit recording ID when creating the recording.

        """

        self._viewer.set_active_recording(recording_id)

    def open_url(
        self,
        url: str,
    ) -> None:
        """
        Open a URL in the viewer.

        Parameters
        ----------
        url: str
            The URL to open.

            Must point to a valid data source.

        """

        self._viewer.open_url(url)

    def close_url(
        self,
        url: str,
    ) -> None:
        """
        Close an open URL in the viewer.

        Does nothing if the URL is not open.

        Parameters
        ----------
        url: str
            The URL to close.

        """

        self._viewer.close_url(url)

    def set_time_ctrl(
        self,
        *,
        sequence: int | None = None,
        duration: int | float | timedelta | np.timedelta64 | None = None,
        timestamp: int | float | datetime | np.datetime64 | None = None,
        timeline: str | None = None,
        play: bool = False,
    ) -> None:
        """
        Set the time control for the viewer.

        You are expected to set at most ONE of the arguments `sequence`, `duration`, or `timestamp`.

        Parameters
        ----------
        sequence:
            Used for sequential indices, like `frame_nr`.
            Must be an integer.
        duration:
            Used for relative times, like `time_since_start`.
            Must either be in seconds, a [`datetime.timedelta`][], or [`numpy.timedelta64`][].
            For nanosecond precision, use `numpy.timedelta64(nanoseconds, 'ns')`.
        timestamp:
            Used for absolute time indices, like `capture_time`.
            Must either be in seconds since Unix epoch, a [`datetime.datetime`][], or [`numpy.datetime64`][].
            For nanosecond precision, use `numpy.datetime64(nanoseconds, 'ns')`.
        play:
            Whether to start playing from the specified time point. Defaults to paused.
        timeline:
            The name of the timeline to switch to. If not provided, time will remain on the current timeline.

        """
        if sum(x is not None for x in (sequence, duration, timestamp)) > 1:
            raise ValueError(
                f"set_time_ctrl: Exactly one of `sequence`, `duration`, and `timestamp` must be set (timeline='{timeline}')",
            )

        if sequence is not None:
            time = sequence
        elif duration is not None:
            time = to_nanos(duration)
        elif timestamp is not None:
            time = to_nanos_since_epoch(timestamp)
        else:
            time = None

        self._viewer.set_time_ctrl(timeline, time, play)

    def on_event(self, callback: Callable[[ViewerEvent], None]) -> None:
        """
        Register a callback to be called when a viewer event occurs.

        The callback will receive a [`ViewerEvent`][rerun.notebook.ViewerEvent], which is one of:
        [`PlayEvent`][rerun.notebook.PlayEvent], [`PauseEvent`][rerun.notebook.PauseEvent],
        [`TimeUpdateEvent`][rerun.notebook.TimeUpdateEvent], [`TimelineChangeEvent`][rerun.notebook.TimelineChangeEvent],
        [`SelectionChangeEvent`][rerun.notebook.SelectionChangeEvent], or [`RecordingOpenEvent`][rerun.notebook.RecordingOpenEvent].

        Parameters
        ----------
        callback:
            A function that takes a [`ViewerEvent`][rerun.notebook.ViewerEvent] as its only argument.

        """
        self._event_callbacks.append(callback)

    def set_credentials(self, access_token: str, email: str) -> None:
        self._viewer.set_credentials(access_token, email)
