from __future__ import annotations

import os
from collections.abc import Callable, Iterable, Iterator
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from typing import Any, Literal

import datafusion as dfn
import numpy as np
import numpy.typing as npt
import pyarrow as pa
from typing_extensions import deprecated

from .types import (
    IndexValuesLike as IndexValuesLike,
    VectorDistanceMetricLike as VectorDistanceMetricLike,
)

# NOTE
#
# The pure Python wrapper/internal pyo3 object is documented in `rerun_py/ARCHITECTURE.md`.

class IndexColumnDescriptor:
    """
    The descriptor of an index column.

    Index columns contain the index values for when the data was updated. They
    generally correspond to Rerun timelines.

    Column descriptors are used to describe the columns in a
    [`Schema`][rerun.catalog.Schema]. They are read-only. To select an index
    column, use [`IndexColumnSelector`][rerun.catalog.IndexColumnSelector].
    """

    @property
    def name(self) -> str:
        """
        The name of the index.

        This property is read-only.
        """

    @property
    def is_static(self) -> bool:
        """Part of generic ColumnDescriptor interface: always False for Index."""

class IndexColumnSelector:
    """
    A selector for an index column.

    Index columns contain the index values for when the data was updated. They
    generally correspond to Rerun timelines.
    """

    def __init__(self, index: str) -> None:
        """
        Create a new `IndexColumnSelector`.

        Parameters
        ----------
        index:
            The name of the index to select. Usually the name of a timeline.

        """

    @property
    def name(self) -> str:
        """
        The name of the index.

        This property is read-only.
        """

class ComponentColumnDescriptor:
    """
    The descriptor of a component column.

    Component columns contain the data for a specific component of an entity.

    Column descriptors are used to describe the columns in a
    [`Schema`][rerun.catalog.Schema]. They are read-only. To select a component
    column, use [`ComponentColumnSelector`][rerun.catalog.ComponentColumnSelector].
    """

    @property
    def entity_path(self) -> str:
        """
        The entity path.

        This property is read-only.
        """

    @property
    def is_property(self) -> bool:
        """Is this column a property?"""  # noqa: D400

    @property
    def component_type(self) -> str | None:
        """
        The component type, if any.

        This property is read-only.
        """

    @property
    def archetype(self) -> str | None:
        """
        The archetype name, if any.

        This property is read-only.
        """

    @property
    def component(self) -> str:
        """
        The component.

        This property is read-only.
        """

    @property
    def is_static(self) -> bool:
        """
        Whether the column is static.

        This property is read-only.
        """

    @property
    def name(self) -> str:
        """
        The name of this column.

        This property is read-only.
        """

class ComponentColumnSelector:
    """
    A selector for a component column.

    Component columns contain the data for a specific component of an entity.
    """

    def __init__(self, entity_path: str, component: str) -> None:
        """
        Create a new `ComponentColumnSelector`.

        Parameters
        ----------
        entity_path:
            The entity path to select.
        component:
            The component to select. Example: `Points3D:positions`.

        """

    @property
    def entity_path(self) -> str:
        """
        The entity path.

        This property is read-only.
        """

    @property
    def component(self) -> str:
        """
        The component.

        This property is read-only.
        """

class VectorDistanceMetric(Enum):  # type: ignore[misc]
    """Which distance metric for use for vector index."""

    L2: VectorDistanceMetric
    COSINE: VectorDistanceMetric
    DOT: VectorDistanceMetric
    HAMMING: VectorDistanceMetric

class SchemaInternal:
    def index_columns(self) -> list[IndexColumnDescriptor]: ...
    def component_columns(self) -> list[ComponentColumnDescriptor]: ...
    def column_for(self, entity_path: str, component: str) -> ComponentColumnDescriptor | None: ...
    def column_for_selector(
        self, selector: str | ComponentColumnSelector | ComponentColumnDescriptor
    ) -> ComponentColumnDescriptor: ...
    def __arrow_c_schema__(self) -> Any: ...

class RecordingInternal:
    def schema(self) -> SchemaInternal: ...
    def recording_id(self) -> str: ...
    def application_id(self) -> str: ...
    def chunks(self) -> ChunkIterator: ...
    def save(self, path: str) -> None: ...

class RRDArchiveInternal:
    def num_recordings(self) -> int: ...
    def all_recordings(self) -> list[RecordingInternal]: ...

class ChunkInternal:
    @property
    def id(self) -> str: ...
    @property
    def entity_path(self) -> str: ...
    @property
    def num_rows(self) -> int: ...
    @property
    def num_columns(self) -> int: ...
    @property
    def is_static(self) -> bool: ...
    @property
    def is_empty(self) -> bool: ...
    @property
    def timeline_names(self) -> list[str]: ...
    def to_record_batch(self) -> pa.RecordBatch: ...
    def with_entity_path(self, entity_path: str) -> ChunkInternal: ...
    @staticmethod
    def from_record_batch(record_batch: pa.RecordBatch) -> ChunkInternal: ...
    @staticmethod
    def from_columns(
        entity_path: str,
        timelines: dict[str, Any],
        components: dict[ComponentDescriptor, Any],
    ) -> ChunkInternal: ...
    def format(self, *, width: int, redact: bool, trim_metadata_keys: bool) -> str: ...
    def apply_lenses(self, lenses: list[LensInternal]) -> list[ChunkInternal]: ...
    def apply_selector(self, source: str, selector: SelectorInternal) -> ChunkInternal: ...
    def __repr__(self) -> str: ...
    def __len__(self) -> int: ...

class ChunkIterator:
    """An iterator over chunks in a recording."""

    def __iter__(self) -> ChunkIterator:
        """Implement iter(self)."""

    def __next__(self) -> ChunkInternal:
        """Implement next(self)."""

def recording_from_chunks(
    chunks: Any,
    application_id: str,
    recording_id: str,
) -> RecordingInternal:
    """Create a new recording from an iterable of chunks."""

def load_recording(path_to_rrd: str | os.PathLike[str]) -> RecordingInternal:
    """Load a single recording from an RRD file."""

def load_archive(path_to_rrd: str | os.PathLike[str]) -> RRDArchiveInternal:
    """Load a rerun archive from an RRD file."""

def _optimization_profile_values(name: str) -> dict[str, object]:
    """
    Test-only: return a dict of the Rust `OptimizationProfile::<NAME>` field values.

    Used by the Python parity test to confirm that
    `OptimizationProfile.{LIVE,OBJECT_STORE}` on the Python side stays in sync
    with the Rust constants this module forwards into `ChunkStoreConfig` /
    `CompactionOptions` above.

    Names: `"LIVE"`, `"OBJECT_STORE"`.
    """

# AI generated stubs for `PyRecordingStream` related class and functions
# TODO(#9187): this will be entirely replaced when `RecordingStream` is itself written in Rust
class PyRecordingStream:
    def is_forked_child(self) -> bool:
        """
        Determine if this stream is operating in the context of a forked child process.

        This means the stream was created in the parent process. It now exists in the child
        process by way of fork, but it is effectively a zombie since its batcher and sink
        threads would not have been copied.

        Calling operations such as flush or set_sink will result in an error.
        """

class ChunkBatcherConfig:
    """Defines the different batching thresholds used within the RecordingStream."""

    def __init__(
        self,
        flush_tick: int | float | timedelta | None = None,
        flush_num_bytes: int | None = None,
        flush_num_rows: int | None = None,
        chunk_max_rows_if_unsorted: int | None = None,
    ) -> None:
        """
        Initialize the chunk batcher configuration.

        Parameters
        ----------
        flush_tick:
            Duration of the periodic tick, by default `None`.
            Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable.

        flush_num_bytes:
            Flush if the accumulated payload has a size in bytes equal or greater than this, by default `None`.
            Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable.

        flush_num_rows:
            Flush if the accumulated payload has a number of rows equal or greater than this, by default `None`.
            Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable.

        chunk_max_rows_if_unsorted:
            Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted,
            by default `None`.
            Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable.

        """

    @property
    def flush_tick(self) -> timedelta:
        """
        Duration of the periodic tick.

        Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable.
        """

    @flush_tick.setter
    def flush_tick(self, value: float | int | timedelta) -> None:
        """
        Duration of the periodic tick.

        Equivalent to setting: `RERUN_FLUSH_TICK_SECS` environment variable.
        """

    @property
    def flush_num_bytes(self) -> int:
        """
        Flush if the accumulated payload has a size in bytes equal or greater than this.

        Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable.
        """

    @flush_num_bytes.setter
    def flush_num_bytes(self, value: int) -> None:
        """
        Flush if the accumulated payload has a size in bytes equal or greater than this.

        Equivalent to setting: `RERUN_FLUSH_NUM_BYTES` environment variable.
        """

    @property
    def flush_num_rows(self) -> int:
        """
        Flush if the accumulated payload has a number of rows equal or greater than this.

        Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable.
        """

    @flush_num_rows.setter
    def flush_num_rows(self, value: int) -> None:
        """
        Flush if the accumulated payload has a number of rows equal or greater than this.

        Equivalent to setting: `RERUN_FLUSH_NUM_ROWS` environment variable.
        """

    @property
    def chunk_max_rows_if_unsorted(self) -> int:
        """
        Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted.

        Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable.
        """

    @chunk_max_rows_if_unsorted.setter
    def chunk_max_rows_if_unsorted(self, value: int) -> None:
        """
        Split a chunk if it contains >= rows than this threshold and one or more of its timelines are unsorted.

        Equivalent to setting: `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` environment variable.
        """

    @staticmethod
    def DEFAULT() -> ChunkBatcherConfig:
        """Default configuration, applicable to most use cases."""

    @staticmethod
    def LOW_LATENCY() -> ChunkBatcherConfig:
        """Low-latency configuration, preferred when streaming directly to a viewer."""

    @staticmethod
    def ALWAYS_TEST_ONLY() -> ChunkBatcherConfig:
        """
        Always flushes ASAP.

        !!! warning
            Test-only configuration. Produces an unrealistically large number of chunks and is
            not suitable for production workloads. With a file sink in particular, per-chunk
            metadata is accumulated in memory until the SDK process ends and the file footer
            can be written, which can drive memory usage through the roof. Use
            [`LOW_LATENCY`][rerun_bindings.ChunkBatcherConfig.LOW_LATENCY] instead for fast
            flushing in real applications.
        """

    @staticmethod
    def NEVER() -> ChunkBatcherConfig:
        """Never flushes unless manually told to (or hitting one the builtin invariants)."""

class PyMemorySinkStorage:
    def concat_as_bytes(self, concat: PyMemorySinkStorage | None = None) -> bytes:
        """
        Concatenate the contents of the [`MemorySinkStorage`] as bytes.

        Note: This will do a blocking flush before returning!
        """

    def num_msgs(self) -> int:
        """
        Count the number of pending messages in the [`MemorySinkStorage`].

        This will do a blocking flush before returning!
        """

    def drain_as_bytes(self) -> bytes:
        """
        Drain all messages logged to the [`MemorySinkStorage`] and return as bytes.

        This will do a blocking flush before returning!
        """

class PyBinarySinkStorage:
    def read(self, *, flush: bool = True, flush_timeout_sec: float = 1e38) -> bytes | None:
        """
        Read the bytes from the binary sink.

        If `flush` is `True`, the sink will be flushed before reading.
        If all the data was not successfully flushed within the given timeout,
        an exception will be raised.

        Parameters
        ----------
        flush:
            If true (default), the stream will be flushed before reading.
        flush_timeout_sec:
            If `flush` is `True`, wait at most this many seconds.
            If the timeout is reached, an error is raised.

        """

    def flush(self, *, timeout_sec: float = 1e38) -> None:
        """
        Flushes the binary sink and ensures that all logged messages have been encoded into the stream.

        This will block until the flush is complete, or the timeout is reached, or an error occurs.

        If all the data was not successfully flushed within the given timeout,
        an exception will be raised.

        Parameters
        ----------
        timeout_sec:
            Wait at most this many seconds.
            If the timeout is reached, an error is raised.

        """

#
# init
#

def flush_and_cleanup_orphaned_recordings() -> None:
    """Flush and then cleanup any orphaned recordings."""

def new_recording(
    application_id: str,
    recording_id: str | None = None,
    make_default: bool = True,
    make_thread_default: bool = True,
    default_enabled: bool = True,
    send_properties: bool = True,
    batcher_config: ChunkBatcherConfig | None = None,
) -> PyRecordingStream:
    """Create a new recording stream."""

def new_blueprint(
    application_id: str,
    make_default: bool = True,
    make_thread_default: bool = True,
    default_enabled: bool = True,
) -> PyRecordingStream:
    """Create a new blueprint stream."""

def shutdown() -> None:
    """Shutdown the Rerun SDK."""

def cleanup_if_forked_child() -> None:
    """Cleans up internal state if this is the child of a forked process."""

def spawn(
    port: int = 9876,
    memory_limit: str = ...,
    server_memory_limit: str = ...,
    hide_welcome_screen: bool = False,
    detach_process: bool = True,
    executable_name: str = ...,
    executable_path: str | None = None,
    extra_args: list[str] = ...,
    extra_env: list[tuple[str, str]] = ...,
) -> None:
    """Spawn a new viewer."""

#
# recordings
#

def get_application_id(recording: PyRecordingStream | None = None) -> str | None:
    """Get the current recording stream's application ID."""

def get_recording_id(recording: PyRecordingStream | None = None) -> str | None:
    """Get the current recording stream's recording ID."""

def get_data_recording(recording: PyRecordingStream | None = None) -> PyRecordingStream | None:
    """Returns the currently active data recording in the global scope, if any; fallbacks to the specified recording otherwise, if any."""

def get_global_data_recording() -> PyRecordingStream | None:
    """Returns the currently active data recording in the global scope, if any."""

def set_global_data_recording(recording: PyRecordingStream | None = None) -> PyRecordingStream | None:
    """
    Replaces the currently active recording in the global scope with the specified one.

    Returns the previous one, if any.
    """

def get_thread_local_data_recording() -> PyRecordingStream | None:
    """Returns the currently active data recording in the thread-local scope, if any."""

def set_thread_local_data_recording(recording: PyRecordingStream | None = None) -> PyRecordingStream | None:
    """
    Replaces the currently active recording in the thread-local scope with the specified one.

    Returns the previous one, if any.
    """

def get_blueprint_recording(overrides: PyRecordingStream | None = None) -> PyRecordingStream | None:
    """Returns the currently active blueprint recording in the global scope, if any; fallbacks to the specified recording otherwise, if any."""

def get_global_blueprint_recording() -> PyRecordingStream | None:
    """Returns the currently active blueprint recording in the global scope, if any."""

def set_global_blueprint_recording(recording: PyRecordingStream | None = None) -> PyRecordingStream | None:
    """
    Replaces the currently active recording in the global scope with the specified one.

    Returns the previous one, if any.
    """

def get_thread_local_blueprint_recording() -> PyRecordingStream | None:
    """Returns the currently active blueprint recording in the thread-local scope, if any."""

def set_thread_local_blueprint_recording(
    recording: PyRecordingStream | None = None,
) -> PyRecordingStream | None:
    """
    Replaces the currently active recording in the thread-local scope with the specified one.

    Returns the previous one, if any.

    """

def check_for_rrd_footer(file_path: str | os.PathLike[str]) -> bool:
    """
    Check if the RRD has a valid RRD footer.

    This is useful for unit-tests to verify that data has been fully flushed to disk.
    """

def disconnect_orphaned_recordings() -> None:
    """
    Disconnect any orphaned recordings.

    This can be used to make sure that recordings get closed/finalized
    properly when all references have been dropped.
    """

#
# component descriptor
#

class ComponentDescriptor:
    """
    A `ComponentDescriptor` fully describes the semantics of a column of data.

    Every component at a given entity path is uniquely identified by the
    `component` field of the descriptor. The `archetype` and `component_type`
    fields provide additional information about the semantics of the data.
    """

    def __init__(self, component: str, archetype: str | None = None, component_type: str | None = None) -> None:
        """Creates a component descriptor."""

    @property
    def archetype(self) -> str | None:
        """
        Optional name of the `Archetype` associated with this data.

        `None` if the data wasn't logged through an archetype.

        Example: `rerun.archetypes.Points3D`.
        """

    @property
    def component(self) -> str:
        """
        Uniquely identifies of the component associated with this data.

        Example: `Points3D:positions`.
        """

    @property
    def component_type(self) -> str | None:
        """
        Optional type information for this component.

        Can be used to inform applications on how to interpret the data.

        Example: `rerun.components.Position3D`.
        """

    def with_overrides(self, archetype: str | None = None, component_type: str | None = None) -> ComponentDescriptor:
        """Unconditionally sets `archetype` and `component_type` to the given ones (if specified)."""

    def or_with_overrides(self, archetype: str | None = None, component_type: str | None = None) -> ComponentDescriptor:
        """Sets `archetype` and `component_type` to the given one iff it's not already set."""

    def with_builtin_archetype(self, archetype: str) -> ComponentDescriptor:
        """Sets `archetype` in a format similar to built-in archetypes."""

#
# sinks
#

def is_enabled(recording: PyRecordingStream | None = None) -> bool:
    """Whether the recording stream enabled."""

def binary_stream(recording: PyRecordingStream | None = None) -> PyBinarySinkStorage | None:
    """Create a new binary stream sink, and return the associated binary stream."""

class GrpcSink:
    """
    Used in [`rerun.RecordingStream.set_sinks`][].

    Connect the recording stream to a remote Rerun Viewer on the given URL.
    """

    def __init__(self, url: str | None = None) -> None:
        """
        Initialize a gRPC sink.

        Parameters
        ----------
        url:
            The URL to connect to

            The scheme must be one of `rerun://`, `rerun+http://`, or `rerun+https://`,
            and the pathname must be `/proxy`.

            The default is `rerun+http://127.0.0.1:9876/proxy`.

        """

class FileSink:
    """
    Used in [`rerun.RecordingStream.set_sinks`][].

    Save the recording stream to a file.
    """

    def __init__(self, path: str | os.PathLike[str], *, write_footer: bool = True) -> None:
        """
        Initialize a file sink.

        Parameters
        ----------
        path:
            Path to write to. The file will be overwritten.
        write_footer:
            Whether to emit a complete RRD footer (including a manifest of every chunk) at the
            end of the stream. Defaults to `True`.

            Producing a footer keeps per-chunk metadata in memory for the lifetime of the sink,
            which grows linearly with the number of chunks logged. Pass `write_footer=False` for
            long-running streaming sessions; the resulting file is still a valid RRD and a
            footer can be added after the fact via `rerun rrd optimize`.

            *Warning*: lack of footer will significantly hurt random-access performance and some
            tools (e.g. LazyStore) may not work properly.

        """

def set_sinks(
    sinks: list[Any],
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
) -> None:
    """Stream data to multiple sinks."""

def connect_grpc(
    url: str | None,
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
) -> None:
    """Connect the recording stream to a remote Rerun Viewer on the given URL."""

def connect_grpc_blueprint(
    url: str | None,
    make_active: bool,
    make_default: bool,
    blueprint_stream: PyRecordingStream,
) -> None:
    """Special binding for directly sending a blueprint stream to a connection."""

def save(
    path: str,
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
    *,
    write_footer: bool = True,
) -> None:
    """Save the recording stream to a file."""

def save_blueprint(path: str, blueprint_stream: PyRecordingStream) -> None:
    """Special binding for directly savings a blueprint stream to a file."""

def stdout(
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
    *,
    write_footer: bool = True,
) -> None:
    """Save to stdout."""

def memory_recording(recording: PyRecordingStream | None = None) -> PyMemorySinkStorage | None:
    """Create an in-memory rrd file."""

def set_callback_sink(
    callback: Callable[[bytes], Any],
    recording: PyRecordingStream | None = None,
) -> None:
    """Set callback sink."""

def set_callback_sink_blueprint(
    callback: Callable[[bytes], Any],
    make_active: bool,
    make_default: bool,
    blueprint_stream: PyRecordingStream | None,
) -> None:
    """Set callback sink for blueprint."""

def serve_grpc(
    grpc_port: int | None,
    server_memory_limit: str,
    newest_first: bool = False,
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
    cors_allow_origin: list[str] = ...,  # type: ignore[assignment]
) -> str:
    """
    Spawn a gRPC server which an SDK or Viewer can connect to.

    Returns the URI of the server so you can connect the viewer to it.
    """

def serve_web_viewer(web_port: int | None = None, open_browser: bool = True, connect_to: str | None = None) -> None:
    """
    Serve a web-viewer over HTTP.

    This only serves HTML+JS+Wasm, but does NOT host a gRPC server.
    """

def serve_web(
    open_browser: bool,
    web_port: int | None,
    grpc_port: int | None,
    server_memory_limit: str,
    default_blueprint: PyMemorySinkStorage | None = None,
    recording: PyRecordingStream | None = None,
    cors_allow_origin: list[str] = ...,  # type: ignore[assignment]
) -> None:
    """Serve a web-viewer AND host a gRPC server."""

def disconnect(recording: PyRecordingStream | None = None) -> None:
    """
    Disconnect from remote server (if any).

    Subsequent log messages will be buffered and either sent on the next call to `connect_grpc` or `spawn`.
    """

def finalize_deferred_sinks(recording: PyRecordingStream | None = None) -> None:
    """
    Finalize any deferred-finalization sinks (i.e. file-like sinks that write a footer at the end).

    For a bare `FileSink` this is equivalent to `disconnect()`. For a `MultiSink` containing both
    streaming and file-like children, only the file-like children are dropped — the streaming
    children stay live. For all other sinks this is a no-op.

    Used by `RecordingStream.__exit__` so that file-backed recordings are consumable as soon as
    the `with`-block exits, without waiting for `__del__` / GC.
    """

def flush(*, timeout_sec: float = 1e38, recording: PyRecordingStream | None = None) -> None:
    """Block until outstanding data has been flushed to the sink."""

#
# time
#

def set_time_sequence(
    timeline: str,
    sequence: int,
    recording: PyRecordingStream | None = None,
) -> None:
    """Set the current time for this thread as an integer sequence."""

def set_time_duration_nanos(
    timeline: str,
    nanos: int,
    recording: PyRecordingStream | None = None,
) -> None:
    """Set the current duration for this thread in nanoseconds."""

def set_time_timestamp_nanos_since_epoch(
    timeline: str,
    nanos: int,
    recording: PyRecordingStream | None = None,
) -> None:
    """Set the current time for this thread in nanoseconds."""

def send_recording_name(
    name: str,
    recording: PyRecordingStream | None = None,
) -> None:
    """Send the name of the recording."""

def send_recording_start_time_nanos(
    nanos: int,
    recording: PyRecordingStream | None = None,
) -> None:
    """Send the start time of the recording."""

def disable_timeline(
    timeline: str,
    recording: PyRecordingStream | None = None,
) -> None:
    """Clear time information for the specified timeline on this thread."""

def reset_time(recording: PyRecordingStream | None = None) -> None:
    """Clear all timeline information on this thread."""

#
# log any
#

def log_arrow_msg(
    entity_path: str,
    components: dict[Any, Any],
    static_: bool,
    recording: PyRecordingStream | None = None,
) -> None:
    """Log an arrow message."""

def send_arrow_chunk(
    entity_path: str,
    timelines: dict[Any, Any],
    components: dict[Any, Any],
    recording: PyRecordingStream | None = None,
) -> None:
    """
    Directly send an arrow chunk to the recording stream.

    Params
    ------
    entity_path: `str`
        The entity path to log the chunk to.
    timelines: `Dict[str, arrow::Int64Array]`
        A dictionary mapping timeline names to their values.
    components: `Dict[ComponentDescriptor, arrow::ListArray]`
        A dictionary mapping component types to their values.
    """

def send_chunks(
    chunks: ChunkInternal | Iterable[ChunkInternal],
    recording: PyRecordingStream | None = None,
) -> None:
    """
    Send chunks to the recording stream.

    Accepts a single chunk or any iterable of chunks. Blocks until every chunk
    has been pushed to the recording's batcher.
    """

def log_file_from_path(
    file_path: str | os.PathLike[str],
    entity_path_prefix: str | None = None,
    static_: bool = False,
    recording: PyRecordingStream | None = None,
) -> None:
    """Log a file by path."""

def log_file_from_contents(
    file_path: str | os.PathLike[str],
    file_contents: bytes,
    entity_path_prefix: str | None = None,
    static_: bool = False,
    recording: PyRecordingStream | None = None,
) -> None:
    """Log a file by contents."""

def send_blueprint(
    blueprint: PyMemorySinkStorage,
    make_active: bool = False,
    make_default: bool = True,
    recording: PyRecordingStream | None = None,
) -> None:
    """Send a blueprint to the given recording stream."""

def send_recording(rrd: RecordingInternal, recording: PyRecordingStream | None = None) -> None:
    """
    Send all chunks from a [`PyRecording`] to the given recording stream.

    !!! Warning
        ⚠️ This API is experimental and may change or be removed in future versions! ⚠️
    """

#
# misc
#

def version() -> str:
    """Return a verbose version string."""

def is_dev_build() -> bool:
    """Return True if the Rerun SDK is a dev/debug build."""

def get_app_url() -> str:
    """
    Get an url to an instance of the web-viewer.

    This may point to app.rerun.io or localhost depending on
    whether [`start_web_viewer_server()`] was called.
    """

def start_web_viewer_server(port: int) -> None:
    """Start a web server to host the run web-assets."""

def escape_entity_path_part(part: str) -> str:
    """Escape an entity path."""

def new_entity_path(parts: list[str]) -> str:
    """Create an entity path."""

def new_property_entity_path(parts: list[str]) -> str:
    """Create a property entity path."""

def asset_video_read_frame_timestamps_nanos(video_bytes_arrow_array: Any, media_type: str | None = None) -> list[int]:
    """
    Reads the timestamps of all frames in a video asset.

    Implementation note:
    On the Python side we start out with a pyarrow array of bytes. Converting it to
    Python `bytes` can be done with `to_pybytes` but this requires copying the data.
    So instead, we pass the arrow array directly.
    """

#####################################################################################################################
## CATALOG                                                                                                         ##
#####################################################################################################################

class EntryId:
    """A unique identifier for an entry in the catalog."""

    def __init__(self, id: str) -> None:
        """Create a new `EntryId` from a string."""

    def __str__(self) -> str:
        """Return str(self)."""

    def as_bytes(self) -> bytes:
        """Return the raw 16-byte representation."""

class EntryKind:
    """The kinds of entries that can be stored in the catalog."""

    DATASET: EntryKind
    DATASET_VIEW: EntryKind
    TABLE: EntryKind
    TABLE_VIEW: EntryKind
    BLUEPRINT_DATASET: EntryKind

    def __str__(self, /) -> str:
        """Return str(self)."""

    def __int__(self) -> int:
        """int(self)"""  # noqa: D400

class EntryDetailsInternal:
    @property
    def id(self) -> EntryId: ...
    @property
    def name(self) -> str: ...
    @property
    def kind(self) -> EntryKind: ...
    @property
    def created_at(self) -> datetime: ...
    @property
    def updated_at(self) -> datetime: ...

class DatasetEntryInternal:
    def catalog(self) -> CatalogClientInternal: ...
    def delete(self) -> None: ...
    def set_name(self, name: str) -> None: ...
    def entry_details(self) -> EntryDetailsInternal: ...

    # ---

    @property
    def manifest_url(self) -> str: ...
    def schema(self) -> SchemaInternal: ...
    def arrow_schema(self) -> pa.Schema: ...

    # ---

    def blueprint_dataset(self) -> DatasetEntryInternal | None: ...
    def default_blueprint_segment_id(self) -> str | None: ...
    def set_default_blueprint_segment_id(self, segment_id: str | None) -> None: ...

    # ---

    def segment_ids(self) -> list[str]: ...
    def segment_table(self) -> dfn.DataFrame: ...
    def manifest(self) -> dfn.DataFrame: ...
    def segment_url(
        self,
        segment_id: str,
        timeline: str | None = None,
        start: datetime | int | None = None,
        end: datetime | int | None = None,
    ) -> str: ...

    # ---

    def filter_segments(self, segment_ids: list[str]) -> DatasetViewInternal: ...
    def filter_contents(self, exprs: list[str]) -> DatasetViewInternal: ...

    # ---

    def register(
        self, recording_uris: list[str], recording_layers: list[str], on_duplicate: str
    ) -> RegistrationHandleInternal: ...
    def register_prefix(
        self, recordings_prefix: str, layer_name: str, on_duplicate: str
    ) -> RegistrationHandleInternal: ...
    def unregister(
        self,
        *,
        segments_to_drop: list[str],
        layers_to_drop: list[str],
        force: bool = False,
    ) -> None: ...

    # ---

    def download_segment(self, segment_id: str) -> RecordingInternal: ...
    def segment_store(self, segment_id: str) -> LazyStoreInternal: ...

    # ---

    @deprecated(
        "Index creation is currently not supported. Contact Rerun if this is a feature you would like us to support."
    )
    def create_fts_search_index(
        self,
        *,
        column: str | ComponentColumnSelector | ComponentColumnDescriptor,
        time_index: IndexColumnSelector,
        store_position: bool = False,
        base_tokenizer: str = "simple",
    ) -> None: ...
    @deprecated(
        "Index creation is currently not supported. Contact Rerun if this is a feature you would like us to support."
    )
    def create_vector_search_index(
        self,
        *,
        column: str | ComponentColumnSelector | ComponentColumnDescriptor,
        time_index: IndexColumnSelector,
        target_partition_num_rows: int | None = None,
        num_sub_vectors: int = 16,
        distance_metric: VectorDistanceMetric | str = ...,
    ) -> IndexingResult: ...
    def list_search_indexes(self) -> list[IndexingResult]: ...
    def delete_search_indexes(
        self,
        column: str | ComponentColumnSelector | ComponentColumnDescriptor,
    ) -> list[IndexConfig]: ...
    @deprecated(
        "Index search is currently not supported. Contact Rerun if this is a feature you would like us to support."
    )
    def search_fts(
        self,
        query: str,
        column: str | ComponentColumnSelector | ComponentColumnDescriptor,
    ) -> dfn.DataFrame: ...
    @deprecated(
        "Index search is currently not supported. Contact Rerun if this is a feature you would like us to support."
    )
    def search_vector(
        self,
        query: Any,  # VectorLike
        column: str | ComponentColumnSelector | ComponentColumnDescriptor,
        top_k: int,
    ) -> dfn.DataFrame: ...

    # ---

    def do_maintenance(
        self,
        optimize_indexes: bool = False,
        retrain_indexes: bool = False,
        compact_fragments: bool = False,
        cleanup_before: datetime | None = None,
        unsafe_allow_recent_cleanup: bool = False,
    ) -> None: ...

class SegmentUrlUdfInternal:
    """Rust-backed ScalarUDF for building segment URLs."""

    def __datafusion_scalar_udf__(self) -> Any:
        """Scalar UDF pycapsule."""

class DatasetViewInternal:
    """Internal Rust implementation of DatasetView."""

    # Properties
    @property
    def dataset(self) -> DatasetEntryInternal: ...
    @property
    def filtered_segment_ids(self) -> set[str] | None: ...
    @property
    def content_filters(self) -> list[str]: ...

    # Methods
    def schema(self) -> SchemaInternal: ...
    def arrow_schema(self) -> pa.Schema: ...
    def segment_ids(self) -> list[str]: ...
    def reader(
        self,
        *,
        index: str | None,
        include_semantically_empty_columns: bool = False,
        include_tombstone_columns: bool = False,
        fill_latest_at: bool = False,
        using_index_values: IndexValuesLike | None = None,
    ) -> dfn.DataFrame: ...
    def filter_segments(self, segment_ids: list[str]) -> DatasetViewInternal: ...
    def filter_contents(self, exprs: list[str]) -> DatasetViewInternal: ...

class TableEntryInternal:
    def catalog(self) -> CatalogClientInternal: ...
    def delete(self) -> None: ...
    def set_name(self, name: str) -> None: ...
    def entry_details(self) -> EntryDetailsInternal: ...

    # ---

    def __datafusion_table_provider__(self, session: Any) -> Any: ...
    def reader(self) -> dfn.DataFrame: ...
    def to_arrow_reader(self) -> pa.RecordBatchReader: ...

    # ---

    @property
    def storage_url(self) -> str: ...
    def write_batches(
        self,
        batches: pa.RecordBatchReader,
        insert_mode: TableInsertModeInternal,
    ) -> None: ...

class TableInsertModeInternal:
    """The modes of operation when writing tables."""

    APPEND: TableInsertModeInternal
    OVERWRITE: TableInsertModeInternal
    REPLACE: TableInsertModeInternal

class _UrdfTreeInternal:
    """Internal Rust implementation of a parsed URDF tree."""

    @staticmethod
    def from_file_path(
        path: str | os.PathLike[str],
        entity_path_prefix: str | None = None,
        *,
        frame_prefix: str | None = None,
        static_transform_entity_path: str | None = None,
    ) -> _UrdfTreeInternal: ...
    @property
    def name(self) -> str: ...
    @property
    def frame_prefix(self) -> str | None: ...
    def root_link(self) -> _UrdfLinkInternal: ...
    def joints(self) -> list[_UrdfJointInternal]: ...
    def get_joint_by_name(self, joint_name: str) -> _UrdfJointInternal | None: ...
    def get_joint_child(self, joint: _UrdfJointInternal) -> _UrdfLinkInternal: ...
    def get_link_by_name(self, link_name: str) -> _UrdfLinkInternal | None: ...
    def get_collision_geometry_paths(self, link: str | _UrdfLinkInternal) -> list[str]: ...
    def get_visual_geometry_paths(self, link: str | _UrdfLinkInternal) -> list[str]: ...
    def log(self, recording: PyRecordingStream | None = None) -> None: ...
    def stream(self, *, include_joint_transforms: bool = True) -> LazyChunkStreamInternal: ...
    def compute_joint_transform_batches(
        self,
        names: pa.Array,
        values: pa.Array,
        *,
        clamp: bool = False,
    ) -> pa.Array: ...

class _UrdfJointInternal:
    """Internal Rust representation of a URDF joint."""

    @property
    def name(self) -> str: ...
    @property
    def joint_type(self) -> str: ...
    @property
    def parent_link(self) -> str: ...
    @property
    def child_link(self) -> str: ...
    @property
    def axis(self) -> tuple[float, float, float]: ...
    @property
    def origin_xyz(self) -> tuple[float, float, float]: ...
    @property
    def origin_rpy(self) -> tuple[float, float, float]: ...
    @property
    def limit_lower(self) -> float: ...
    @property
    def limit_upper(self) -> float: ...
    @property
    def limit_effort(self) -> float: ...
    @property
    def limit_velocity(self) -> float: ...
    @property
    def mimic(self) -> _UrdfMimicInternal | None:
        """The ``<mimic>`` tag for this joint, or ``None`` if this is not a mimic joint."""

    def compute_transform(self, value: float, clamp: bool = False) -> dict[str, Any]:
        """
        Compute the transform components for this joint at the given value.

        The result is wrapped in a dictionary for easy conversion to the final types in Python.

        If `clamp` is True, values outside joint limits will be clamped and a warning is generated.
        If `clamp` is False (default), values outside limits are used as-is without warnings.
        """

    def compute_transform_columns(self, values: list[float], *, clamp: bool = False) -> dict[str, Any]:
        """
        Compute transforms for this joint at multiple values in a single call.

        Returns a dictionary with translations, quaternions, frame names, and warnings
        for use with columnar APIs like `send_columns`.

        If `clamp` is True, values outside joint limits will be clamped and a warning is generated.
        If `clamp` is False (default), values outside limits are used as-is without warnings.
        """

class _UrdfMimicInternal:
    """Internal Rust representation of a URDF ``<mimic>`` tag."""

    @property
    def joint(self) -> str: ...
    @property
    def multiplier(self) -> float: ...
    @property
    def offset(self) -> float: ...

class _UrdfLinkInternal:
    """Internal Rust representation of a URDF link."""

    @property
    def name(self) -> str: ...

class _IndexValuesLikeInternal:
    """
    A Python wrapper for [`IndexValuesLike`] extraction and conversion.

    Provides a Python-accessible interface to normalize various index value
    representations (PyArrow arrays, NumPy arrays, ChunkedArrays) into sorted
    int64 index values.
    """

    def __init__(self, values: IndexValuesLike) -> None: ...
    def to_index_values(self) -> npt.NDArray[np.int64]: ...
    def len(self) -> int: ...

class IndexProperties:
    """The properties and configuration of a user-defined index."""

class IndexConfig:
    """The complete description of a user-defined index."""

    @property
    def time_column(self) -> IndexColumnSelector:
        """Returns the time column that this index applies to."""

    @property
    def component_column(self) -> ComponentColumnSelector:
        """Returns the component column that this index applies to."""

    @property
    def properties(self) -> IndexProperties:
        """Returns the properties/configuration of the index."""

class IndexingResult:
    """Indexing operation status result."""

    @property
    def properties(self) -> IndexConfig:
        """Returns configuration information and properties about the newly created index."""

    @property
    def column(self) -> ComponentColumnSelector:
        """Returns the component column that this index was created on."""

    @property
    def statistics(self) -> str:
        """Returns best-effort backend-specific statistics about the newly created index."""

    def debug_info(self) -> dict[str, Any] | None:
        """
        Get debug information about the indexing operation.

        The exact contents of debug information may vary depending on the indexing operation performed
        and the server implementation.

        Returns
        -------
        Optional[dict]
            A dictionary containing debug information, or `None` if no debug information is available

        """

class CatalogClientInternal:
    def __init__(self, url: str, token: str | None = None) -> None: ...

    # ---

    @staticmethod
    def datafusion_major_version() -> int: ...

    # ---

    @property
    def url(self) -> str: ...

    # ---

    def version_info(self) -> tuple[str, str | None, str | None]: ...
    def rtt_seconds(self, num_pings: int) -> float: ...
    def bandwidth_bytes_per_sec(self, num_bytes: int, rtt_seconds: float) -> float | None: ...
    def datasets(self, include_hidden: bool) -> list[DatasetEntryInternal]: ...
    def tables(self, include_hidden: bool) -> list[TableEntryInternal]: ...

    # ---

    def get_dataset(self, id: EntryId) -> DatasetEntryInternal: ...
    def get_table(self, id: EntryId) -> TableEntryInternal: ...

    # ---

    def create_dataset(self, name: str) -> DatasetEntryInternal: ...
    def register_table(self, name: str, url: str) -> TableEntryInternal: ...
    def create_table(self, name: str, schema: pa.Schema, url: str | None) -> TableEntryInternal: ...
    def ctx(self) -> dfn.SessionContext: ...

    # ---

    def do_global_maintenance(self) -> None: ...

    # ---

    def _entry_id_from_entry_name(self, name: str) -> EntryId: ...

class RegistrationHandleInternal:
    def iter_results(self, timeout_secs: int | None = None) -> Iterator[tuple[str, str, str | None]]: ...
    def wait(self, timeout_secs: int | None = None) -> list[str]: ...
    def cancel(self) -> None: ...

#####################################################################################################################
## VIEWER_CLIENT                                                                                                   ##
#####################################################################################################################

class ViewerClientInternal:
    """Internal implementation. Use ViewerClient from rerun.experimental instead."""

    def __init__(self, addr: str) -> None: ...
    def send_table(self, id: str, table: pa.RecordBatch) -> None: ...
    def save_screenshot(self, file_path: str, view_id: str | None) -> None: ...

class NotFoundError(Exception):
    """Raised when the requested resource is not found."""

class AlreadyExistsError(Exception):
    """Raised when trying to create a resource that already exists."""

class SelectorInternal:
    def __init__(self, query: str) -> None: ...
    def execute(self, source: pa.Array) -> pa.Array | None: ...
    def execute_per_row(self, source: pa.Array) -> pa.Array | None: ...
    def pipe(self, func: Any) -> SelectorInternal: ...
    def try_to_string(self) -> str | None: ...
    def __repr__(self) -> str: ...
    def __str__(self) -> str: ...

class DeriveLensInternal:
    def __init__(
        self,
        input_component: str,
        *,
        output_entity: str | None = None,
        scatter: bool = False,
    ) -> None: ...
    def to_component(self, component: ComponentDescriptor, selector: SelectorInternal) -> DeriveLensInternal: ...
    def to_timeline(self, timeline_name: str, timeline_type: str, selector: SelectorInternal) -> DeriveLensInternal: ...

class MutateLensInternal:
    def __init__(
        self,
        input_component: str,
        selector: SelectorInternal,
        *,
        keep_row_ids: bool = False,
    ) -> None: ...

LensInternal = DeriveLensInternal | MutateLensInternal

class _ServerInternal:
    def __init__(
        self,
        *,
        host: str,
        port: int,
        datasets: dict[str, list[str]],
        dataset_prefixes: dict[str, str],
        tables: dict[str, str],
    ) -> None:
        """
        Create and start a Rerun server.

        Parameters
        ----------
        host:
            The IP address to bind the server to.
        port:
            The port to bind the server to.
        datasets:
            Optional dictionary mapping dataset names to lists of RRD file paths.
        dataset_prefixes:
            Optional dictionary mapping dataset names to directories containing RRDs.
        tables:
            Optional dictionary mapping table names to lance file paths,
            which will be loaded and made available when the server starts.

        """

    def url(self) -> str: ...
    def host(self) -> str: ...
    def shutdown(self) -> None: ...
    def is_running(self) -> bool: ...
    def inject_error(self, method: str) -> None: ...
    def clear_injected_error(self, method: str) -> None: ...

#####################################################################################################################
## AUTH                                                                                                            ##
#####################################################################################################################

class DeviceCodeFlow:
    """
    OAuth login flow implementation.

    The auth flow is browser-based, and the user will be redirected to the OAuth provider.
    """

    def login_url(self) -> str:
        """Get the URL for the OAuth login flow."""

    def user_code(self) -> str:
        """Get the user code."""

    def finish_login_flow(self) -> Credentials:
        """
        Finish the OAuth login flow.

        Returns
        -------
        Credentials
            The credentials of the logged in user.

        """

def init_login_flow() -> DeviceCodeFlow | None:
    """
    Initialize an OAuth login flow.

    Returns
    -------
    DeviceCodeFlow | None
        The login flow, or `None` if the user is already logged in.

    """

class Credentials:
    """The credentials for the OAuth login flow."""

    @property
    def access_token(self) -> str:
        """The access token."""

    @property
    def user_email(self) -> str:
        """The user email."""

def get_credentials() -> Credentials | None:
    """Returns the credentials for the current user."""

def logout() -> str | None:
    """
    Log out by clearing stored credentials.

    Returns
    -------
    str | None
        The logout URL to end the session, or `None` if already logged out.

    """

def _get_trace_context_var() -> Any:
    """
    Return the `ContextVar` that Python uses to pass trace headers to Rust.

    This is the **write side** of the bridge — Python's `with_tracing` decorator
    calls this to get the `ContextVar`, then writes W3C trace headers into it.
    Rust later reads them back via [`read_trace_context_from_python`].

    Returns `None` when `perf_telemetry` is disabled.
    """

def _get_tracing_session_var() -> Any:
    """
    Return the `ContextVar` carrying the active rerun session id.

    Set by the `tracing_session()` context manager and read on every outbound
    gRPC call to merge `rerun_session_id=<id>` into the W3C `tracestate` header.

    Returns `None` when `perf_telemetry` is disabled.
    """

def _is_telemetry_active() -> bool:
    """
    Return `True` if the rerun telemetry stack initialized successfully.

    `tracing_session()` requires this to be true; otherwise the W3C propagator
    is not registered and the session id has no transport.
    """

def _inc_active_tracing_sessions() -> None:
    """Increment the process-wide active-tracing-session gate. Called by `tracing_session().__enter__`."""

def _dec_active_tracing_sessions() -> None:
    """Decrement the process-wide active-tracing-session gate. Called by `tracing_session().__exit__`."""

def _log_tracing_session_started(rerun_session_id: str) -> None:
    """Emit `rerun tracing session started: <rerun_session_id>` through the Rust `tracing` stack at INFO level."""

def _log_tracing_session_finished(
    rerun_session_id: str,
    elapsed_s: float,
    cpu_user_s: float | None,
    cpu_system_s: float | None,
    cpu_iowait_s: float | None,
    net_rx_mb: float | None,
) -> None:
    """
    Emit a single structured INFO event summarizing the tracing session at scope exit.

    `Option<f64>` fields are `None` when the host platform or runtime can't supply
    the metric (psutil missing, or `iowait` unavailable on macOS/Windows). Routed
    through the Rust `tracing` stack so it follows `RUST_LOG` and the fmt-layer
    pipeline like `_log_tracing_session_started`.
    """

#####################################################################################################################
## PIPELINE APIS                                                                                                   ##
#####################################################################################################################

class ChunkStoreInternal:
    """Internal implementation. Use ChunkStore from rerun.experimental instead."""

    @staticmethod
    def from_chunks(chunks: list[ChunkInternal]) -> ChunkStoreInternal: ...
    def schema(self) -> SchemaInternal: ...
    def num_chunks(self) -> int: ...
    def summary(self) -> str: ...
    def stream(self) -> LazyChunkStreamInternal: ...

class LazyStoreInternal:
    """Internal implementation. Use LazyStore from rerun.experimental instead."""

    def schema(self) -> SchemaInternal: ...
    def num_chunks(self) -> int: ...
    def summary(self) -> str: ...
    def stream(self) -> LazyChunkStreamInternal: ...

class StoreEntryInternal:
    """Internal implementation. Use StoreEntry from rerun.experimental instead."""

    @property
    def kind(self) -> Literal["recording", "blueprint"]: ...
    @property
    def application_id(self) -> str: ...
    @property
    def recording_id(self) -> str: ...

class RrdReaderInternal:
    """Internal implementation. Use RrdReader from rerun.experimental instead."""

    def __init__(self, path: str) -> None: ...
    def store_entries(self) -> list[StoreEntryInternal]: ...
    def stream(self, store: StoreEntryInternal | None = None) -> LazyChunkStreamInternal: ...
    def store(self, store: StoreEntryInternal | None = None) -> LazyStoreInternal: ...
    @property
    def path(self) -> Path: ...

class McapReaderInternal:
    """Internal implementation. Use McapReader from rerun.experimental instead."""

    def __init__(
        self,
        path: str,
        timeline_type: str,
        timestamp_offset_ns: int | None,
        decoders: list[str] | None,
        include_topic_regex: list[str] | None,
        exclude_topic_regex: list[str] | None,
    ) -> None: ...
    def stream(self) -> LazyChunkStreamInternal: ...
    @property
    def path(self) -> Path: ...
    @staticmethod
    def available_decoders() -> list[str]: ...

class ParquetReaderInternal:
    """Internal implementation. Use ParquetReader from rerun.experimental instead."""

    def __init__(
        self,
        path: str,
        entity_path_prefix: str | None = None,
        column_grouping: str = "prefix",
        delimiter: str = "_",
        prefixes: list[str] | None = None,
        use_structs: bool = True,
        static_columns: list[str] | None = None,
        index_columns: list[tuple[str, str, str | None]] | None = None,
        column_rules: list[Any] | None = None,
    ) -> None: ...
    def stream(self) -> LazyChunkStreamInternal: ...
    @property
    def path(self) -> Path: ...

class LazyChunkStreamInternal:
    """Internal implementation. Use LazyChunkStream from rerun.experimental instead."""

    def filter(
        self,
        *,
        content: list[str] | None = None,
        has_timeline: str | None = None,
        is_static: bool | None = None,
        components: list[str] | None = None,
    ) -> LazyChunkStreamInternal: ...
    def drop_matching(
        self,
        *,
        content: list[str] | None = None,
        has_timeline: str | None = None,
        is_static: bool | None = None,
        components: list[str] | None = None,
    ) -> LazyChunkStreamInternal: ...
    def split(
        self,
        *,
        content: list[str] | None = None,
        has_timeline: str | None = None,
        is_static: bool | None = None,
        components: list[str] | None = None,
    ) -> tuple[LazyChunkStreamInternal, LazyChunkStreamInternal]: ...
    def lenses(
        self,
        lenses: list[LensInternal],
        output_mode: str,
        content: list[str] | None,
    ) -> LazyChunkStreamInternal: ...
    def map(self, callable: Callable[[ChunkInternal], ChunkInternal]) -> LazyChunkStreamInternal: ...
    def flat_map(self, callable: Callable[[ChunkInternal], list[ChunkInternal]]) -> LazyChunkStreamInternal: ...
    @staticmethod
    def merge(streams: list[LazyChunkStreamInternal]) -> LazyChunkStreamInternal: ...
    def write_rrd(self, path: str, application_id: str, recording_id: str) -> None: ...
    def collect(
        self,
        *,
        max_bytes: int | None = None,
        max_rows: int | None = None,
        max_rows_if_unsorted: int | None = None,
        extra_passes: int = 0,
        gop_batching: bool = False,
        split_size_ratio: float | None = None,
    ) -> ChunkStoreInternal:
        """Consume the stream and materialize all chunks into a ChunkStore."""
    def to_chunks(self) -> list[ChunkInternal]: ...
    def __iter__(self) -> LazyChunkStreamIterator: ...
    @staticmethod
    def from_iter(iterable: Any) -> LazyChunkStreamInternal: ...
    def send_to_recording(self, recording: PyRecordingStream | None = None) -> None:
        """
        Drain this stream into a recording stream.

        If `recording` is `None`, the active recording is used. Blocks until every
        chunk has been pushed to the recording's batcher. A silent no-op when
        there is no active recording.
        """

class LazyChunkStreamIterator:
    """Iterator over chunks from a compiled stream."""

    def __iter__(self) -> LazyChunkStreamIterator:
        """Implement iter(self)."""

    def __next__(self) -> ChunkInternal:
        """Implement next(self)."""
