from __future__ import annotations

import logging
from typing import TYPE_CHECKING, TypeAlias

from typing_extensions import deprecated

import rerun_bindings as bindings
from rerun.blueprint.api import BlueprintLike, create_in_memory_blueprint
from rerun.recording_stream import BinaryStream, RecordingStream, get_application_id
from rerun_bindings import (
    FileSink,
    GrpcSink,
)

from ._spawn import _spawn_viewer

if TYPE_CHECKING:
    import pathlib

    from rerun.recording import Recording  # ty:ignore[deprecated]
    from rerun.recording_stream import RecordingStream


# --- Sinks ---


def is_recording_enabled(recording: RecordingStream | None) -> bool:
    if recording is not None:
        return bindings.is_enabled(recording.inner)  # type: ignore[no-any-return]
    return bindings.is_enabled()  # type: ignore[no-any-return]


LogSinkLike: TypeAlias = GrpcSink | FileSink | BinaryStream


def set_sinks(
    *sinks: LogSinkLike,
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
) -> None:
    """
    Stream data to multiple different sinks.

    Duplicate sinks are not allowed. For example, two [`rerun.GrpcSink`][]s that
    use the same `url` will cause this function to throw a `ValueError`.

    This _replaces_ existing sinks. Calling `rr.init(spawn=True)`, `rr.spawn()`,
    `rr.connect_grpc()` or similar followed by `set_sinks` will result in only
    the sinks passed to `set_sinks` remaining active.

    Only data logged _after_ the `set_sinks` call will be logged to the newly attached sinks.

    Parameters
    ----------
    sinks:
        A list of sinks to wrap.

        See [`rerun.GrpcSink`][], [`rerun.FileSink`][], [`rerun.BinaryStream`][].
    default_blueprint:
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

    Example
    -------
    ```py
    rr.init("rerun_example_tee")
    rr.set_sinks(
        rr.GrpcSink(),
        rr.FileSink("data.rrd")
    )
    rr.log("my/point", rr.Points3D(position=[1.0, 2.0, 3.0]))
    ```

    """

    # Check for duplicates
    seen = set()
    duplicates = set()
    for sink in sinks:
        if sink in seen:
            duplicates.add(sink)
        else:
            seen.add(sink)
    if duplicates:
        raise ValueError(f"Duplicate sinks detected: {', '.join(str(d) for d in duplicates)}")

    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - set_sinks() call ignored")
        return

    application_id = get_application_id(recording)  # NOLINT
    if application_id is None:
        raise ValueError(
            "No application id found. You must call rerun.init before connecting to a viewer, or provide a recording.",
        )

    # If a blueprint is provided, we need to create a blueprint storage object
    blueprint_storage = None
    if default_blueprint is not None:
        blueprint_storage = create_in_memory_blueprint(
            application_id=application_id,
            blueprint=default_blueprint,
        ).storage

    bindings.set_sinks(
        [*sinks],
        default_blueprint=blueprint_storage,
        recording=recording.to_native() if recording is not None else None,
    )


def connect_grpc(
    url: str | None = None,
    *,
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
) -> None:
    """
    Connect to a remote Rerun Viewer on the given URL.

    This function returns immediately.

    Parameters
    ----------
    url:
        The URL to connect to.

        The scheme must be one of `rerun://`, `rerun+http://`, or `rerun+https://`,
        and the pathname must be `/proxy`.

        The default is `rerun+http://127.0.0.1:9876/proxy`.
    default_blueprint:
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

    """
    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - connect_grpc() call ignored")
        return

    from rerun.recording_stream import get_application_id

    application_id = get_application_id(recording)  # NOLINT
    if application_id is None:
        raise ValueError(
            "No application id found. You must call rerun.init before connecting to a viewer, or provide a recording.",
        )

    # If a blueprint is provided, we need to create a blueprint storage object
    blueprint_storage = None
    if default_blueprint is not None:
        blueprint_storage = create_in_memory_blueprint(
            application_id=application_id,
            blueprint=default_blueprint,
        ).storage

    bindings.connect_grpc(
        url=url,
        default_blueprint=blueprint_storage,
        recording=recording.to_native() if recording is not None else None,
    )


def save(
    path: str | pathlib.Path,
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
    *,
    write_footer: bool = True,
) -> None:
    """
    Stream all log-data to a file.

    Call this _before_ you log any data!

    The Rerun Viewer is able to read continuously from the resulting rrd file while it is being written.
    However, depending on your OS and configuration, changes may not be immediately visible due to file caching.
    This is a common issue on Windows and (to a lesser extent) on MacOS.

    Parameters
    ----------
    path:
        The path to save the data to.
    default_blueprint
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
    write_footer:
        Whether to emit a complete RRD footer (including a manifest of every chunk) at the
        end of the stream. Defaults to `True`.

        Producing a footer keeps per-chunk metadata in memory for the lifetime of the sink,
        which grows linearly with the number of chunks logged. Pass `write_footer=False` for
        long-running streaming sessions; the resulting file is still a valid RRD and a footer
        can be added after the fact via `rerun rrd optimize`.

        *Warning*: lack of footer will significantly hurt random-access performance and some
        tools (e.g. LazyStore) may not work properly.

    """

    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - save() call ignored. You must call rerun.init before saving a recording.")
        return

    from rerun.recording_stream import get_application_id

    application_id = get_application_id(recording=recording)  # NOLINT
    if application_id is None:
        raise ValueError(
            "No application id found. You must call rerun.init before connecting to a viewer, or provide a recording.",
        )

    # If a blueprint is provided, we need to create a blueprint storage object
    blueprint_storage = None
    if default_blueprint is not None:
        blueprint_storage = create_in_memory_blueprint(
            application_id=application_id,
            blueprint=default_blueprint,
        ).storage

    bindings.save(
        path=str(path),
        default_blueprint=blueprint_storage,
        recording=recording.to_native() if recording is not None else None,
        write_footer=write_footer,
    )


def stdout(
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
    *,
    write_footer: bool = True,
) -> None:
    """
    Stream all log-data to stdout.

    Pipe it into a Rerun Viewer to visualize it.

    Call this _before_ you log any data!

    If there isn't any listener at the other end of the pipe, the `RecordingStream` will
    default back to `buffered` mode, in order not to break the user's terminal.

    Parameters
    ----------
    default_blueprint
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
    write_footer:
        Whether to emit a complete RRD footer (including a manifest of every chunk) at the
        end of the stream. Defaults to `True`. See [`rerun.save`][] for details and trade-offs.

        *Warning*: lack of footer will significantly hurt random-access performance and some
        tools (e.g. LazyStore) may not work properly.

    """

    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - save() call ignored. You must call rerun.init before saving a recording.")
        return

    from rerun.recording_stream import get_application_id

    application_id = get_application_id(recording=recording)  # NOLINT
    if application_id is None:
        raise ValueError(
            "No application id found. You must call rerun.init before connecting to a viewer, or provide a recording.",
        )

    # If a blueprint is provided, we need to create a blueprint storage object
    blueprint_storage = None
    if default_blueprint is not None:
        blueprint_storage = create_in_memory_blueprint(
            application_id=application_id,
            blueprint=default_blueprint,
        ).storage

    bindings.stdout(
        default_blueprint=blueprint_storage,
        recording=recording.to_native() if recording is not None else None,
        write_footer=write_footer,
    )


def disconnect(recording: RecordingStream | None = None) -> None:
    """
    Closes all gRPC connections, servers, and files.

    Closes all gRPC connections, servers, and files that have been opened with
    [`rerun.connect_grpc`], [`rerun.serve`], [`rerun.save`] or [`rerun.spawn`].

    Parameters
    ----------
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

    """

    bindings.disconnect(
        recording=recording.to_native() if recording is not None else None,
    )


def serve_grpc(
    *,
    grpc_port: int | None = None,
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
    server_memory_limit: str = "1GiB",
    newest_first: bool = False,
    cors_allow_origin: list[str] | None = None,
) -> str:
    """
    Serve log-data over gRPC.

    You can connect to this server with the native viewer using `rerun rerun+http://localhost:{grpc_port}/proxy`.

    The gRPC server will buffer all log data in memory so that late connecting viewers will get all the data.
    You can control the amount of data buffered by the gRPC server with the `server_memory_limit` argument.
    Once reached, the earliest logged data will be dropped. Static data is never dropped.

    Returns the URI of the server so you can connect the viewer to it.

    This function returns immediately. In order to keep the server running, you must keep the Python process running
    as well.

    NOTE: The grpc server is associated with a [`rerun.RecordingStream`][] object. By default, if no other recording
    was specified, this will be the global recording. When that `RecordingStream` is disconnected, or otherwise goes
    out of scope, the associated gRPC server will be shut down.
    See: [Issue: #12313](https://github.com/rerun-io/rerun/issues/12313) for possible complications.

    Parameters
    ----------
    grpc_port:
        The port to serve the gRPC server on (defaults to 9876)
    default_blueprint:
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
    server_memory_limit:
        Maximum amount of memory to use for buffering log data for clients that connect late.
        This can be a percentage of the total ram (e.g. "50%") or an absolute value (e.g. "4GB").
    newest_first:
        If `True`, the server will start sending back the newest messages _first_.
        If `False`, the messages will be played back in the order they arrived.
    cors_allow_origin:
        Additional origin patterns allowed to make CORS requests to the gRPC server.
        By default, only localhost and rerun.io are allowed.
        Patterns are matched against the full `Origin` header (e.g. `"https://example.com:8080"`),
        using glob-style matching where `*` matches any sequence of characters.
        Examples: `"https://*.example.com"`, `"https://example.com:8080"`,
        `"https://example.com:*"`.

    """
    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - serve_grpc() call ignored")
        return "[rerun is disabled]"

    from rerun.recording_stream import get_application_id

    application_id = get_application_id(recording=recording)  # NOLINT
    if application_id is None:
        raise ValueError(
            "No application id found. You must call rerun.init before connecting to a viewer, or provide a recording.",
        )

    # If a blueprint is provided, we need to create a blueprint storage object
    blueprint_storage = None
    if default_blueprint is not None:
        blueprint_storage = create_in_memory_blueprint(
            application_id=application_id,
            blueprint=default_blueprint,
        ).storage

    return bindings.serve_grpc(
        grpc_port,
        server_memory_limit=server_memory_limit,
        default_blueprint=blueprint_storage,
        recording=recording.to_native() if recording is not None else None,
        newest_first=newest_first,
        cors_allow_origin=cors_allow_origin or [],
    )


def send_blueprint(
    blueprint: BlueprintLike,
    *,
    make_active: bool = True,
    make_default: bool = True,
    recording: RecordingStream | None = None,
) -> None:
    """
    Create a blueprint from a `BlueprintLike` and send it to the `RecordingStream`.

    Parameters
    ----------
    blueprint:
        A blueprint object to send to the viewer.
    make_active:
        Immediately make this the active blueprint for the associated `app_id`.
        Note that setting this to `false` does not mean the blueprint may not still end
        up becoming active. In particular, if `make_default` is true and there is no other
        currently active blueprint.
    make_default:
        Make this the default blueprint for the `app_id`.
        The default blueprint will be used as the template when the user resets the
        blueprint for the app. It will also become the active blueprint if no other
        blueprint is currently active.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

    """

    from rerun.recording_stream import get_application_id

    application_id = get_application_id(recording=recording)  # NOLINT

    if application_id is None:
        raise ValueError("No application id found. You must call rerun.init before sending a blueprint.")

    blueprint_storage = create_in_memory_blueprint(application_id=application_id, blueprint=blueprint).storage

    bindings.send_blueprint(
        blueprint_storage,
        make_active,
        make_default,
        recording=recording.to_native() if recording is not None else None,
    )


@deprecated(
    "send_recording is deprecated since 0.32. Use rerun.experimental.send_chunks(reader.store()) instead.",
)
def send_recording(rrd: Recording, recording: RecordingStream | None = None) -> None:  # ty:ignore[deprecated]
    """
    Send a `Recording` loaded from a `.rrd` to the `RecordingStream`.

    !!! Warning
        ⚠️ This API is experimental and may change or be removed in future versions! ⚠️

    Parameters
    ----------
    rrd:
        A recording loaded from a `.rrd` file.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].

    """
    application_id = get_application_id(recording=recording)  # NOLINT

    if application_id is None:
        raise ValueError("No application id found. You must call rerun.init before sending a recording.")

    bindings.send_recording(  # ty: ignore[deprecated]
        rrd._internal,
        recording=recording.to_native() if recording is not None else None,
    )


def spawn(
    *,
    port: int = 9876,
    connect: bool = True,
    memory_limit: str = "75%",
    server_memory_limit: str = "1GiB",
    hide_welcome_screen: bool = False,
    detach_process: bool = True,
    executable_name: str = "rerun",
    executable_path: str | None = None,
    default_blueprint: BlueprintLike | None = None,
    recording: RecordingStream | None = None,
) -> None:
    """
    Spawn a Rerun Viewer, listening on the given port.

    This is often the easiest and best way to use Rerun.
    Just call this once at the start of your program.

    You can also call [rerun.init][] with a `spawn=True` argument.

    Parameters
    ----------
    port:
        The port to listen on.
    connect:
        also connect to the viewer and stream logging data to it.
    memory_limit:
        An upper limit on how much memory the Rerun Viewer should use.
        When this limit is reached, Rerun will drop the oldest data.
        Example: `16GB` or `50%` (of system total).
    server_memory_limit:
        An upper limit on how much memory the gRPC server running
        in the same process as the Rerun Viewer should use.
        When this limit is reached, Rerun will drop the oldest data.
        Example: `16GB` or `50%` (of system total).

        Defaults to `1GiB`.
    hide_welcome_screen:
        Hide the normal Rerun welcome screen.
    detach_process:
        Detach Rerun Viewer process from the application process.
    executable_name:
        Specifies the name of the Rerun executable.
        You can omit the `.exe` suffix on Windows.

        Defaults to `rerun`.
    executable_path:
        Enforce a specific executable to use instead of searching
        through PATH for `executable_name`.

        Unspecified by default.
    recording:
        Specifies the [`rerun.RecordingStream`][] to use if `connect = True`.
        If left unspecified, defaults to the current active data recording, if there is one.
        See also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
    default_blueprint
        Optionally set a default blueprint to use for this application. If the application
        already has an active blueprint, the new blueprint won't become active until the user
        clicks the "reset blueprint" button. If you want to activate the new blueprint
        immediately, instead use the [`rerun.send_blueprint`][] API.

    """

    if not is_recording_enabled(recording):
        logging.warning("Rerun is disabled - spawn() call ignored.")
        return

    _spawn_viewer(
        port=port,
        memory_limit=memory_limit,
        server_memory_limit=server_memory_limit,
        hide_welcome_screen=hide_welcome_screen,
        detach_process=detach_process,
        executable_name=executable_name,
        executable_path=executable_path,
    )

    if connect:
        connect_grpc(
            f"rerun+http://127.0.0.1:{port}/proxy",
            recording=recording,  # NOLINT
            default_blueprint=default_blueprint,
        )
