from __future__ import annotations

import asyncio
import re
import weakref
from contextlib import suppress
from functools import partial
from pathlib import Path, PurePath
from typing import Callable, Iterable, Optional
from urllib.parse import unquote

from markdown_it import MarkdownIt
from markdown_it.token import Token
from rich.text import Text
from typing_extensions import TypeAlias

from textual._slug import TrackedSlugs, slug_for_tcss_id
from textual.app import ComposeResult
from textual.await_complete import AwaitComplete
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.content import Content, Span
from textual.css.query import NoMatches
from textual.events import Mount
from textual.highlight import highlight
from textual.layout import Layout
from textual.layouts.grid import GridLayout
from textual.message import Message
from textual.reactive import reactive, var
from textual.style import Style
from textual.widget import Widget
from textual.widgets import Static, Tree
from textual.widgets._label import Label

TableOfContentsType: TypeAlias = "list[tuple[int, str, str | None]]"
"""Information about the table of contents of a markdown document.

The triples encode the level, the label, and the optional block id of each heading.
"""


class MarkdownStream:
    """An object to manage streaming markdown.

    This will accumulate markdown fragments if they can't be rendered fast enough.

    This object is typically created by the [Markdown.get_stream][textual.widgets.Markdown.get_stream] method.

    """

    def __init__(self, markdown_widget: Markdown) -> None:
        """
        Args:
            markdown_widget: Markdown widget to update.
        """
        self.markdown_widget = markdown_widget
        self._task: asyncio.Task | None = None
        self._new_markup = asyncio.Event()
        self._pending: list[str] = []
        self._stopped = False

    def start(self) -> None:
        """Start the updater running in the background.

        No need to call this, if the object was created by [Markdown.get_stream][textual.widgets.Markdown.get_stream].

        """
        if self._task is None:
            self._task = asyncio.create_task(self._run())

    async def stop(self) -> None:
        """Stop the stream and await its finish."""
        if self._task is not None:
            self._task.cancel()
            await self._task
            self._task = None
            self._stopped = True

    async def write(self, markdown_fragment: str) -> None:
        """Append or enqueue a markdown fragment.

        Args:
            markdown_fragment: A string to append at the end of the document.
        """
        if self._stopped:
            raise RuntimeError("Can't write to the stream after it has stopped.")
        if not markdown_fragment:
            # Nothing to do for empty strings.
            return
        # Append the new fragment, and set an event to tell the _run loop to wake up
        self._pending.append(markdown_fragment)
        self._new_markup.set()
        # Allow the task to wake up and actually display the new markdown
        await asyncio.sleep(0)

    async def _run(self) -> None:
        """Run a task to append markdown fragments when available."""
        try:
            while await self._new_markup.wait():
                new_markdown = "".join(self._pending)
                self._pending.clear()
                self._new_markup.clear()
                await asyncio.shield(self.markdown_widget.append(new_markdown))
        except asyncio.CancelledError:
            # Task has been cancelled, add any outstanding markdown
            pass

        new_markdown = "".join(self._pending)
        if new_markdown:
            await self.markdown_widget.append(new_markdown)


class Navigator:
    """Manages a stack of paths like a browser."""

    def __init__(self) -> None:
        self.stack: list[Path] = []
        self.index = 0

    @property
    def location(self) -> Path:
        """The current location.

        Returns:
            A path for the current document.
        """
        if not self.stack:
            return Path(".")
        return self.stack[self.index]

    @property
    def start(self) -> bool:
        """Is the current location at the start of the stack?"""
        return self.index == 0

    @property
    def end(self) -> bool:
        """Is the current location at the end of the stack?"""
        return self.index >= len(self.stack) - 1

    def go(self, path: str | PurePath) -> Path:
        """Go to a new document.

        Args:
            path: Path to new document.

        Returns:
            New location.
        """
        location, anchor = Markdown.sanitize_location(str(path))
        if location == Path(".") and anchor:
            current_file, _ = Markdown.sanitize_location(str(self.location))
            path = f"{current_file}#{anchor}"
        new_path = self.location.parent / Path(path)
        self.stack = self.stack[: self.index + 1]
        new_path = new_path.absolute()
        self.stack.append(new_path)
        self.index = len(self.stack) - 1
        return new_path

    def back(self) -> bool:
        """Go back in the stack.

        Returns:
            True if the location changed, otherwise False.
        """
        if self.index:
            self.index -= 1
            return True
        return False

    def forward(self) -> bool:
        """Go forward in the stack.

        Returns:
            True if the location changed, otherwise False.
        """
        if self.index < len(self.stack) - 1:
            self.index += 1
            return True
        return False


class MarkdownBlock(Static):
    """The base class for a Markdown Element."""

    COMPONENT_CLASSES = {"em", "strong", "s", "code_inline"}
    """
    These component classes target standard inline markdown styles.
    Changing these will potentially break the standard markdown formatting.

    | Class | Description |
    | :- | :- |
    | `code_inline` | Target text that is styled as inline code. |
    | `em` | Target text that is emphasized inline. |
    | `s` | Target text that is styled inline with strikethrough. |
    | `strong` | Target text that is styled inline with strong. |
    """

    DEFAULT_CSS = """
    MarkdownBlock {
        width: 1fr;
        height: auto;
    }
    """

    def __init__(
        self,
        markdown: Markdown,
        token: Token,
        source_range: tuple[int, int] | None = None,
        *args,
        **kwargs,
    ) -> None:
        self._markdown_ref = weakref.ref(markdown)
        """A reference to the Markdown document that contains this block."""
        self._content: Content = Content()
        self._token: Token = token
        self._blocks: list[MarkdownBlock] = []
        self._inline_token: Token | None = None
        self.source_range: tuple[int, int] = source_range or (
            (token.map[0], token.map[1]) if token.map is not None else (0, 0)
        )

        super().__init__(
            *args,
            name=token.type,
            classes=f"level-{token.level}",
            expand=True,
            **kwargs,
        )

    @property
    def _markdown(self) -> Markdown:
        """Resolve the weak ref to _markdown"""
        markdown = self._markdown_ref()
        assert markdown is not None
        return markdown

    @property
    def select_container(self) -> Widget:
        return self.query_ancestor(Markdown)

    @property
    def source(self) -> str | None:
        """The source of this block if known, otherwise `None`."""
        if self.source_range is None:
            return None
        start, end = self.source_range
        return "".join(self._markdown.source.splitlines(keepends=True)[start:end])

    def _copy_context(self, block: MarkdownBlock) -> None:
        """Copy the context from another block."""
        self._token = block._token

    def compose(self) -> ComposeResult:
        yield from self._blocks
        self._blocks.clear()

    def set_content(self, content: Content) -> None:
        self._content = content
        self.update(content)

    async def _update_from_block(self, block: MarkdownBlock) -> None:
        await self.remove()
        await self._markdown.mount(block)

    async def action_link(self, href: str) -> None:
        """Called on link click."""
        self.post_message(Markdown.LinkClicked(self._markdown, href))

    def build_from_token(self, token: Token) -> None:
        """Build inline block content from its source token.

        Args:
            token: The token from which this block is built.
        """
        self._inline_token = token
        content = self._token_to_content(token)
        self.set_content(content)

    def _token_to_content(self, token: Token) -> Content:
        """Convert an inline token to Textual Content.

        Args:
            token: A markdown token.

        Returns:
            Content instance.
        """

        if token.children is None:
            return Content("")

        tokens: list[str] = []
        spans: list[Span] = []
        style_stack: list[tuple[Style | str, int]] = []
        position: int = 0

        def add_content(text: str) -> None:
            """Add text to the tokens list, and advance the position.

            Args:
                text: Text to add.

            """
            nonlocal position
            tokens.append(text)
            position += len(text)

        def add_style(style: Style | str) -> None:
            """Add a style to the stack.

            Args:
                style: A style as Style instance or string.
            """
            style_stack.append((style, position))

        position = 0

        def close_tag() -> None:
            style, start = style_stack.pop()
            spans.append(Span(start, position, style))

        for child in token.children:
            child_type = child.type
            if child_type == "text":
                add_content(re.sub(r"\s+", " ", child.content))
            if child_type == "hardbreak":
                add_content("\n")
            if child_type == "softbreak":
                add_content(" ")
            elif child_type == "code_inline":
                add_style(".code_inline")
                add_content(child.content)
                close_tag()
            elif child_type == "em_open":
                add_style(".em")
            elif child_type == "strong_open":
                add_style(".strong")
            elif child_type == "s_open":
                add_style(".s")
            elif child_type == "link_open":
                href = child.attrs.get("href", "")
                action = f"link({href!r})"
                add_style(Style.from_meta({"@click": action}))
            elif child_type == "image":
                href = child.attrs.get("src", "")
                alt = child.attrs.get("alt", "")
                action = f"link({href!r})"
                add_style(Style.from_meta({"@click": action}))
                add_content("🖼  ")
                if alt:
                    add_content(f"({alt})")
                if child.children is not None:
                    for grandchild in child.children:
                        add_content(grandchild.content)
                close_tag()

            elif child_type.endswith("_close"):
                close_tag()

        content = Content("".join(tokens), spans=spans)
        return content


class MarkdownHeader(MarkdownBlock):
    """Base class for a Markdown header."""

    LEVEL = 0

    DEFAULT_CSS = """
    MarkdownHeader {
        color: $text;
        margin: 2 0 1 0;

    }
    """


class MarkdownH1(MarkdownHeader):
    """An H1 Markdown header."""

    LEVEL = 1

    DEFAULT_CSS = """
    MarkdownH1 {
        content-align: center middle;
        color: $markdown-h1-color;
        background: $markdown-h1-background;
        text-style: $markdown-h1-text-style;
    }
    """


class MarkdownH2(MarkdownHeader):
    """An H2 Markdown header."""

    LEVEL = 2

    DEFAULT_CSS = """
    MarkdownH2 {
        color: $markdown-h2-color;
        background: $markdown-h2-background;
        text-style: $markdown-h2-text-style;
    }
    """


class MarkdownH3(MarkdownHeader):
    """An H3 Markdown header."""

    LEVEL = 3

    DEFAULT_CSS = """
    MarkdownH3 {
        color: $markdown-h3-color;
        background: $markdown-h3-background;
        text-style: $markdown-h3-text-style;
        margin: 1 0;
        width: auto;
    }
    """


class MarkdownH4(MarkdownHeader):
    """An H4 Markdown header."""

    LEVEL = 4

    DEFAULT_CSS = """
    MarkdownH4 {
        color: $markdown-h4-color;
        background: $markdown-h4-background;
        text-style: $markdown-h4-text-style;
        margin: 1 0;
    }
    """


class MarkdownH5(MarkdownHeader):
    """An H5 Markdown header."""

    LEVEL = 5

    DEFAULT_CSS = """
    MarkdownH5 {
        color: $markdown-h5-color;
        background: $markdown-h5-background;
        text-style: $markdown-h5-text-style;
        margin: 1 0;
    }
    """


class MarkdownH6(MarkdownHeader):
    """An H6 Markdown header."""

    LEVEL = 6

    DEFAULT_CSS = """
    MarkdownH6 {
        color: $markdown-h6-color;
        background: $markdown-h6-background;
        text-style: $markdown-h6-text-style;
        margin: 1 0;
    }
    """


class MarkdownHorizontalRule(MarkdownBlock):
    """A horizontal rule."""

    DEFAULT_CSS = """
    MarkdownHorizontalRule {
        border-bottom: solid $secondary;
        height: 1;
        padding-top: 1;
        margin-bottom: 1;
    }
    """


class MarkdownParagraph(MarkdownBlock):
    """A paragraph Markdown block."""

    SCOPED_CSS = False
    DEFAULT_CSS = """
    Markdown > MarkdownParagraph {
         margin: 0 0 1 0;
    }
    """

    async def _update_from_block(self, block: MarkdownBlock):
        if isinstance(block, MarkdownParagraph):
            self.set_content(block._content)
            self._copy_context(block)
        else:
            await super()._update_from_block(block)


class MarkdownBlockQuote(MarkdownBlock):
    """A block quote Markdown block."""

    DEFAULT_CSS = """
    MarkdownBlockQuote {
        background: $boost;
        border-left: outer $text-primary 50%;
        margin: 1 0;
        padding: 0 1;
    }
    MarkdownBlockQuote:light {
        border-left: outer $text-secondary;
    }
    MarkdownBlockQuote > BlockQuote {
        margin-left: 2;
        margin-top: 1;
    }
    """


class MarkdownList(MarkdownBlock):
    DEFAULT_CSS = """

    MarkdownList {
        width: 1fr;
    }

    MarkdownList MarkdownList {
        margin: 0;
        padding-top: 0;
    }
    """


class MarkdownBulletList(MarkdownList):
    """A Bullet list Markdown block."""

    DEFAULT_CSS = """
    MarkdownBulletList {
        margin: 0 0 1 0;
        padding: 0 0;
    }

    MarkdownBulletList Horizontal {
        height: auto;
        width: 1fr;
    }

    MarkdownBulletList Vertical {
        height: auto;
        width: 1fr;
    }
    """

    def compose(self) -> ComposeResult:
        for block in self._blocks:
            if isinstance(block, MarkdownListItem):
                bullet = MarkdownBullet()
                bullet.symbol = block.bullet
                yield Horizontal(bullet, Vertical(*block._blocks))
        self._blocks.clear()


class MarkdownOrderedList(MarkdownList):
    """An ordered list Markdown block."""

    DEFAULT_CSS = """
    MarkdownOrderedList {
        margin: 0 0 1 0;
        padding: 0 0;
    }

    MarkdownOrderedList Horizontal {
        height: auto;
        width: 1fr;
    }

    MarkdownOrderedList Vertical {
        height: auto;
        width: 1fr;
    }
    """

    def compose(self) -> ComposeResult:
        suffix = ". "
        start = 1
        if self._blocks and isinstance(self._blocks[0], MarkdownOrderedListItem):
            try:
                start = int(self._blocks[0].bullet)
            except ValueError:
                pass
        symbol_size = max(
            len(f"{number}{suffix}")
            for number, block in enumerate(self._blocks, start)
            if isinstance(block, MarkdownListItem)
        )
        for number, block in enumerate(self._blocks, start):
            if isinstance(block, MarkdownListItem):
                bullet = MarkdownBullet()
                bullet.symbol = f"{number}{suffix}".rjust(symbol_size + 1)
                yield Horizontal(bullet, Vertical(*block._blocks))

        self._blocks.clear()


class MarkdownTableCellContents(Static):
    """Widget for table cells.

    A shim over a Static which responds to links.
    """

    async def action_link(self, href: str) -> None:
        """Pass a link action on to the MarkdownTable parent."""
        self.post_message(Markdown.LinkClicked(self.query_ancestor(Markdown), href))


class MarkdownTableContent(Widget):
    """Renders a Markdown table."""

    DEFAULT_CSS = """
    MarkdownTableContent {
        width: 1fr;
        height: auto;
        layout: grid;
        grid-columns: auto;
        grid-rows: auto;
        grid-gutter: 1 1;

        & > .cell {
            margin: 0 0;
            height: auto;
            padding: 0 1;
            text-overflow: ellipsis;
        }
        & > .header {
            height: auto;
            margin: 0 0;
            padding: 0 1;
            color: $primary;
            text-overflow: ellipsis;
            content-align: left bottom;
        }
        keyline: thin $foreground 20%;
    }
    MarkdownTableContent > .markdown-table--header {
        text-style: bold;
    }
    """

    COMPONENT_CLASSES = {"markdown-table--header", "markdown-table--lines"}

    def __init__(self, headers: list[Content], rows: list[list[Content]]):
        self.headers = headers.copy()
        """List of header text."""
        self.rows = rows.copy()
        """The row contents."""
        super().__init__()
        self.shrink = True
        self.last_row = 0

    def pre_layout(self, layout: Layout) -> None:
        assert isinstance(layout, GridLayout)
        layout.auto_minimum = True
        layout.expand = not self.query_ancestor(MarkdownTable).styles.is_auto_width
        layout.shrink = True
        layout.stretch_height = True

    def compose(self) -> ComposeResult:
        for header in self.headers:
            yield MarkdownTableCellContents(header, classes="header").with_tooltip(
                header
            )
        for row_index, row in enumerate(self.rows, 1):
            for cell_index, cell in enumerate(row, 1):
                yield MarkdownTableCellContents(
                    cell,
                    classes=f"row{row_index} cell",
                    name=f"cell{row_index}.{cell_index}",
                ).with_tooltip(cell.plain)
            self.last_row = row_index

    def _update_content(self, headers: list[Content], rows: list[list[Content]]):
        """Update cell contents."""
        self.headers = headers
        self.rows = rows
        cells: list[Content] = [
            *self.headers,
            *[cell for row in self.rows for cell in row],
        ]
        for child, updated_cell in zip(self.query(MarkdownTableCellContents), cells):
            child.update(updated_cell, layout=False)

    async def _update_rows(self, updated_rows: list[list[Content]]) -> None:
        self.styles.grid_size_columns = len(self.headers)
        await self.query_children(f".cell.row{self.last_row}").remove()
        new_cells: list[Static] = []
        for row_index, row in enumerate(updated_rows, self.last_row):
            for cell in row:
                new_cells.append(
                    Static(
                        cell,
                        classes=f"row{row_index} cell",
                    ).with_tooltip(cell)
                )
        self.last_row = row_index
        await self.mount_all(new_cells)

    def on_mount(self) -> None:
        self.styles.grid_size_columns = len(self.headers)

    async def action_link(self, href: str) -> None:
        """Pass a link action on to the MarkdownTable parent."""
        if isinstance(self.parent, MarkdownTable):
            await self.parent.action_link(href)


class MarkdownTable(MarkdownBlock):
    """A Table markdown Block."""

    DEFAULT_CSS = """
    MarkdownTable {
        width: 1fr;
        margin-bottom: 1;
        &:light {
            background: white 30%;
        }
    }
    """

    def __init__(self, markdown: Markdown, token: Token, *args, **kwargs) -> None:
        super().__init__(markdown, token, *args, **kwargs)
        self._headers: list[Content] = []
        self._rows: list[list[Content]] = []

    def compose(self) -> ComposeResult:
        headers, rows = self._get_headers_and_rows()
        self._headers = headers
        self._rows = rows
        yield MarkdownTableContent(headers, rows)

    def _get_headers_and_rows(self) -> tuple[list[Content], list[list[Content]]]:
        """Get list of headers, and list of rows.

        Returns:
            A tuple containing a list of headers, and a list of rows.
        """

        def flatten(block: MarkdownBlock) -> Iterable[MarkdownBlock]:
            for block in block._blocks:
                if block._blocks:
                    yield from flatten(block)
                yield block

        headers: list[Content] = []
        rows: list[list[Content]] = []
        for block in flatten(self):
            if isinstance(block, MarkdownTH):
                headers.append(block._content)
            elif isinstance(block, MarkdownTR):
                rows.append([])
            elif isinstance(block, MarkdownTD):
                rows[-1].append(block._content)
        if rows and not rows[-1]:
            rows.pop()
        return headers, rows

    async def _update_from_block(self, block: MarkdownBlock) -> None:
        """Special case to update a Markdown table.

        Args:
            block: Existing markdown block.
        """
        if isinstance(block, MarkdownTable):
            try:
                table_content = self.query_one(MarkdownTableContent)
            except NoMatches:
                pass
            else:
                if table_content.rows:
                    current_rows = self._rows
                    _new_headers, new_rows = block._get_headers_and_rows()
                    updated_rows = new_rows[len(current_rows) - 1 :]
                    self._rows = new_rows
                    await table_content._update_rows(updated_rows)
                    return
        await super()._update_from_block(block)


class MarkdownTBody(MarkdownBlock):
    """A table body Markdown block."""


class MarkdownTHead(MarkdownBlock):
    """A table head Markdown block."""


class MarkdownTR(MarkdownBlock):
    """A table row Markdown block."""


class MarkdownTH(MarkdownBlock):
    """A table header Markdown block."""


class MarkdownTD(MarkdownBlock):
    """A table data Markdown block."""


class MarkdownBullet(Widget):
    """A bullet widget."""

    DEFAULT_CSS = """
    MarkdownBullet {
        width: auto;
        color: $text-primary;
        &:light {
            color: $text-secondary;
        }
    }
    """

    symbol = reactive("\u25cf")
    """The symbol for the bullet."""

    def get_selection(self, _selection) -> tuple[str, str] | None:
        return self.symbol, " "

    def render(self) -> Content:
        return Content(self.symbol)


class MarkdownListItem(MarkdownBlock):
    """A list item Markdown block."""

    DEFAULT_CSS = """
    MarkdownListItem {
        layout: horizontal;
        margin-right: 1;
        height: auto;
    }

    MarkdownListItem > Vertical {
        width: 1fr;
        height: auto;
    }
    """

    def __init__(self, markdown: Markdown, token: Token, bullet: str) -> None:
        self.bullet = bullet
        super().__init__(markdown, token)


class MarkdownOrderedListItem(MarkdownListItem):
    pass


class MarkdownUnorderedListItem(MarkdownListItem):
    pass


class MarkdownFence(MarkdownBlock):
    """A fence Markdown block."""

    DEFAULT_CSS = """
    MarkdownFence {
        padding: 0;
        margin: 1 0;
        overflow: scroll hidden;
        scrollbar-size-horizontal: 0;
        scrollbar-size-vertical: 0;
        width: 1fr;
        height: auto;
        color: rgb(210,210,210);
        background: black 10%;
        &:light {
            background: white 30%;
        }
        & > Label {
            padding: 1 2;
        }
    }
    MarkdownFence:ansi {
        background: transparent;

        margin: 0;
        & > Label {
            padding: 1 0;
        }
        
    }
    """

    def __init__(self, markdown: Markdown, token: Token, code: str) -> None:
        super().__init__(markdown, token)
        self.code = code
        self.lexer = token.info
        self._highlighted_code = self.highlight(
            self.code,
            self.lexer,
            ansi=self.app.native_ansi_color,
            dark=self.app.current_theme.dark,
        )
        # No links required in code
        self.auto_links = False

    def notify_style_update(self) -> None:
        """Update highlight theme when App theme changes."""
        self._highlighted_code = self.highlight(
            self.code,
            self.lexer,
            ansi=self.app.native_ansi_color,
            dark=self.app.current_theme.dark,
        )
        self.set_content(self._highlighted_code)
        return super().notify_style_update()

    @property
    def allow_horizontal_scroll(self) -> bool:
        return True

    @classmethod
    def highlight(
        cls, code: str, language: str, ansi: bool = False, dark: bool = False
    ) -> Content:
        if ansi:
            if dark:
                from textual.highlight import ANSIDarkHighlightTheme as HighlightTheme
            else:
                from textual.highlight import ANSILightHighlightTheme as HighlightTheme

        else:
            from textual.highlight import HighlightTheme

        return highlight(code, language=language or None, theme=HighlightTheme)

    def _copy_context(self, block: MarkdownBlock) -> None:
        if isinstance(block, MarkdownFence):
            self.code = block.code
            self.lexer = block.lexer
            self._highlighted_code = block._highlighted_code
        super()._copy_context(block)

    async def _update_from_block(self, block: MarkdownBlock):
        if isinstance(block, MarkdownFence):
            self._copy_context(block)
            self.set_content(block._highlighted_code)
        else:
            await super()._update_from_block(block)

    def set_content(self, content: Content) -> None:
        self._content = content
        with suppress(NoMatches):
            self.query_one("#code-content", Label).update(content)

    def compose(self) -> ComposeResult:
        yield Label(self._highlighted_code, id="code-content", expand=True)


NUMERALS = " ⅠⅡⅢⅣⅤⅥ"


class Markdown(Widget):
    DEFAULT_CSS = """
    Markdown {
        height: auto;
        padding: 0 2 0 2;
        layout: vertical;
        color: $foreground;
        overflow-y: hidden;

        &:ansi {
            MarkdownBlock > .code_inline {
                background: ansi_default !important;
            }
        }
        
        MarkdownBlock {
            &:dark > .code_inline {
                background: $warning 10%;
                color: $text-warning 95%;
            }
            &:light > .code_inline {
                background: $error 5%;
                color: $text-error 95%;
            }           
            & > .em {
                text-style: italic;
            }
            & > .strong {
                text-style: bold;
            }
            & > .s {
                text-style: strike;
            }
        }
    }
    """

    BULLETS = ["• ", "▪ ", "‣ ", "⭑ ", "◦ "]
    """Unicode bullets used for unordered lists."""

    BLOCKS: dict[str, type[MarkdownBlock]] = {
        "h1": MarkdownH1,
        "h2": MarkdownH2,
        "h3": MarkdownH3,
        "h4": MarkdownH4,
        "h5": MarkdownH5,
        "h6": MarkdownH6,
        "hr": MarkdownHorizontalRule,
        "paragraph_open": MarkdownParagraph,
        "blockquote_open": MarkdownBlockQuote,
        "bullet_list_open": MarkdownBulletList,
        "ordered_list_open": MarkdownOrderedList,
        "list_item_ordered_open": MarkdownOrderedListItem,
        "list_item_unordered_open": MarkdownUnorderedListItem,
        "table_open": MarkdownTable,
        "tbody_open": MarkdownTBody,
        "thead_open": MarkdownTHead,
        "tr_open": MarkdownTR,
        "th_open": MarkdownTH,
        "td_open": MarkdownTD,
        "fence": MarkdownFence,
        "code_block": MarkdownFence,
    }
    """Mapping of block names on to a widget class."""

    def __init__(
        self,
        markdown: str | None = None,
        *,
        name: str | None = None,
        id: str | None = None,
        classes: str | None = None,
        parser_factory: Callable[[], MarkdownIt] | None = None,
        open_links: bool = True,
    ):
        """A Markdown widget.

        Args:
            markdown: String containing Markdown or None to leave blank for now.
            name: The name of the widget.
            id: The ID of the widget in the DOM.
            classes: The CSS classes of the widget.
            parser_factory: A factory function to return a configured MarkdownIt instance. If `None`, a "gfm-like" parser is used.
            open_links: Open links automatically. If you set this to `False`, you can handle the [`LinkClicked`][textual.widgets.markdown.Markdown.LinkClicked] events.
        """
        super().__init__(name=name, id=id, classes=classes)
        self._initial_markdown: str | None = markdown
        self._markdown = ""
        self._parser_factory = parser_factory
        self._table_of_contents: TableOfContentsType | None = None
        self._open_links = open_links
        self._last_parsed_line = 0
        self._theme = ""

    @property
    def table_of_contents(self) -> TableOfContentsType:
        """The document's table of contents."""
        if self._table_of_contents is None:
            self._table_of_contents = [
                (header.LEVEL, header._content.plain, header.id)
                for header in self.children
                if isinstance(header, MarkdownHeader)
            ]
        return self._table_of_contents

    class TableOfContentsUpdated(Message):
        """The table of contents was updated."""

        def __init__(
            self, markdown: Markdown, table_of_contents: TableOfContentsType
        ) -> None:
            super().__init__()
            self.markdown: Markdown = markdown
            """The `Markdown` widget associated with the table of contents."""
            self.table_of_contents: TableOfContentsType = table_of_contents
            """Table of contents."""

        @property
        def control(self) -> Markdown:
            """The `Markdown` widget associated with the table of contents.

            This is an alias for [`TableOfContentsUpdated.markdown`][textual.widgets.Markdown.TableOfContentsSelected.markdown]
            and is used by the [`on`][textual.on] decorator.
            """
            return self.markdown

    class TableOfContentsSelected(Message):
        """An item in the TOC was selected."""

        def __init__(self, markdown: Markdown, block_id: str) -> None:
            super().__init__()
            self.markdown: Markdown = markdown
            """The `Markdown` widget where the selected item is."""
            self.block_id: str = block_id
            """ID of the block that was selected."""

        @property
        def control(self) -> Markdown:
            """The `Markdown` widget where the selected item is.

            This is an alias for [`TableOfContentsSelected.markdown`][textual.widgets.Markdown.TableOfContentsSelected.markdown]
            and is used by the [`on`][textual.on] decorator.
            """
            return self.markdown

    class LinkClicked(Message):
        """A link in the document was clicked."""

        def __init__(self, markdown: Markdown, href: str) -> None:
            super().__init__()
            self.markdown: Markdown = markdown
            """The `Markdown` widget containing the link clicked."""
            self.href: str = unquote(href)
            """The link that was selected."""

        @property
        def control(self) -> Markdown:
            """The `Markdown` widget containing the link clicked.

            This is an alias for [`LinkClicked.markdown`][textual.widgets.Markdown.LinkClicked.markdown]
            and is used by the [`on`][textual.on] decorator.
            """
            return self.markdown

    @property
    def source(self) -> str:
        """The markdown source."""
        return self._markdown or ""

    def get_block_class(self, block_name: str) -> type[MarkdownBlock]:
        """Get the block widget class.

        Args:
            block_name: Name of the block.

        Returns:
            A MarkdownBlock class
        """
        return self.BLOCKS[block_name]

    async def _on_mount(self, _: Mount) -> None:
        initial_markdown = self._initial_markdown
        self._initial_markdown = None
        await self.update(initial_markdown or "")

        if initial_markdown is None:
            self.post_message(
                Markdown.TableOfContentsUpdated(
                    self, self._table_of_contents
                ).set_sender(self)
            )

    @classmethod
    def get_stream(cls, markdown: Markdown) -> MarkdownStream:
        """Get a [MarkdownStream][textual.widgets.markdown.MarkdownStream] instance to stream Markdown in the background.

        If you append to the Markdown document many times a second, it is possible the widget won't
        be able to update as fast as you write (occurs around 20 appends per second). It will still
        work, but the user will have to wait for the UI to catch up after the document has be retrieved.

        Using a [MarkdownStream][textual.widgets.markdown.MarkdownStream] will combine several updates in to one
        as necessary to keep up with the incoming data.

        example:
        ```python
        # self.get_chunk is a hypothetical method that retrieves a
        # markdown fragment from the network
        @work
        async def stream_markdown(self) -> None:
            markdown_widget = self.query_one(Markdown)
            container = self.query_one(VerticalScroll)
            container.anchor()

            stream = Markdown.get_stream(markdown_widget)
            try:
                while (chunk:= await self.get_chunk()) is not None:
                    await stream.write(chunk)
            finally:
                await stream.stop()
        ```


        Args:
            markdown: A [Markdown][textual.widgets.Markdown] widget instance.

        Returns:
            The Markdown stream object.
        """
        updater = MarkdownStream(markdown)
        updater.start()
        return updater

    def on_markdown_link_clicked(self, event: LinkClicked) -> None:
        if self._open_links:
            self.app.open_url(event.href)

    @staticmethod
    def sanitize_location(location: str) -> tuple[Path, str]:
        """Given a location, break out the path and any anchor.

        Args:
            location: The location to sanitize.

        Returns:
            A tuple of the path to the location cleaned of any anchor, plus
            the anchor (or an empty string if none was found).
        """
        location, _, anchor = location.partition("#")
        return Path(location), anchor

    def goto_anchor(self, anchor: str) -> bool:
        """Try and find the given anchor in the current document.

        Args:
            anchor: The anchor to try and find.

        Note:
            The anchor is found by looking at all of the headings in the
            document and finding the first one whose slug matches the
            anchor.

            Note that the slugging method used is similar to that found on
            GitHub.

        Returns:
            True when the anchor was found in the current document, False otherwise.
        """
        if not self._table_of_contents or not isinstance(self.parent, Widget):
            return False
        unique = TrackedSlugs()
        for _, title, header_id in self._table_of_contents:
            if unique.slug(title) == anchor:
                self.query_one(f"#{header_id}").scroll_visible(top=True)
                return True
        return False

    async def load(self, path: Path) -> None:
        """Load a new Markdown document.

        Args:
            path: Path to the document.

        Raises:
            OSError: If there was some form of error loading the document.

        Note:
            The exceptions that can be raised by this method are all of
            those that can be raised by calling [`Path.read_text`][pathlib.Path.read_text].
        """
        path, anchor = self.sanitize_location(str(path))
        data = await asyncio.get_running_loop().run_in_executor(
            None, partial(path.read_text, encoding="utf-8")
        )
        await self.update(data)
        if anchor:
            self.goto_anchor(anchor)

    def unhandled_token(self, token: Token) -> MarkdownBlock | None:
        """Process an unhandled token.

        Args:
            token: The MarkdownIt token to handle.

        Returns:
            Either a widget to be added to the output, or `None`.
        """
        return None

    def _parse_markdown(self, tokens: Iterable[Token]) -> Iterable[MarkdownBlock]:
        """Create a stream of MarkdownBlock widgets from markdown.

        Args:
            tokens: List of tokens.

        Yields:
            Widgets for mounting.
        """

        stack: list[MarkdownBlock] = []
        stack_append = stack.append

        get_block_class = self.get_block_class

        for token in tokens:
            token_type = token.type
            if token_type == "heading_open":
                stack_append(get_block_class(token.tag)(self, token))
            elif token_type == "hr":
                yield get_block_class("hr")(self, token)
            elif token_type == "paragraph_open":
                stack_append(get_block_class("paragraph_open")(self, token))
            elif token_type == "blockquote_open":
                stack_append(get_block_class("blockquote_open")(self, token))
            elif token_type == "bullet_list_open":
                stack_append(get_block_class("bullet_list_open")(self, token))
            elif token_type == "ordered_list_open":
                stack_append(get_block_class("ordered_list_open")(self, token))
            elif token_type == "list_item_open":
                if token.info:
                    stack_append(
                        get_block_class("list_item_ordered_open")(
                            self, token, token.info
                        )
                    )
                else:
                    item_count = sum(
                        1
                        for block in stack
                        if isinstance(block, MarkdownUnorderedListItem)
                    )
                    stack_append(
                        get_block_class("list_item_unordered_open")(
                            self,
                            token,
                            self.BULLETS[item_count % len(self.BULLETS)],
                        )
                    )
            elif token_type == "table_open":
                stack_append(get_block_class("table_open")(self, token))
            elif token_type == "tbody_open":
                stack_append(get_block_class("tbody_open")(self, token))
            elif token_type == "thead_open":
                stack_append(get_block_class("thead_open")(self, token))
            elif token_type == "tr_open":
                stack_append(get_block_class("tr_open")(self, token))
            elif token_type == "th_open":
                stack_append(get_block_class("th_open")(self, token))
            elif token_type == "td_open":
                stack_append(get_block_class("td_open")(self, token))
            elif token_type.endswith("_close"):
                block = stack.pop()
                if token.type == "heading_close":
                    block.id = (
                        f"heading-{slug_for_tcss_id(block._content.plain)}-{id(block)}"
                    )
                if stack:
                    stack[-1]._blocks.append(block)
                else:
                    yield block
            elif token_type == "inline":
                stack[-1].build_from_token(token)
            elif token_type in ("fence", "code_block"):
                fence_class = get_block_class(token_type)
                assert issubclass(fence_class, MarkdownFence)
                fence = fence_class(self, token, token.content.rstrip())
                if stack:
                    stack[-1]._blocks.append(fence)
                else:
                    yield fence
            else:
                external = self.unhandled_token(token)
                if external is not None:
                    if stack:
                        stack[-1]._blocks.append(external)
                    else:
                        yield external

    def _build_from_source(self, markdown: str) -> list[MarkdownBlock]:
        """Build blocks from markdown source.

        Args:
            markdown: A Markdown document, or partial document.

        Returns:
            A list of MarkdownBlock instances.
        """
        parser = (
            MarkdownIt("gfm-like")
            if self._parser_factory is None
            else self._parser_factory()
        )
        tokens = parser.parse(markdown)
        return list(self._parse_markdown(tokens))

    def update(self, markdown: str) -> AwaitComplete:
        """Update the document with new Markdown.

        Args:
            markdown: A string containing Markdown.

        Returns:
            An optionally awaitable object. Await this to ensure that all children have been mounted.
        """
        self._theme = self.app.theme
        parser = (
            MarkdownIt("gfm-like")
            if self._parser_factory is None
            else self._parser_factory()
        )

        markdown_block = self.query("MarkdownBlock")
        self._markdown = markdown
        self._table_of_contents = None

        async def await_update() -> None:
            """Update in batches."""
            BATCH_SIZE = 200
            batch: list[MarkdownBlock] = []

            # Lock so that you can't update with more than one document simultaneously
            async with self.lock:
                tokens = await asyncio.get_running_loop().run_in_executor(
                    None, parser.parse, markdown
                )

                # Remove existing blocks for the first batch only
                removed: bool = False

                async def mount_batch(batch: list[MarkdownBlock]) -> None:
                    """Mount a single match of blocks.

                    Args:
                        batch: A list of blocks to mount.
                    """
                    nonlocal removed
                    if removed:
                        await self.mount_all(batch)
                    else:
                        with self.app.batch_update():
                            await markdown_block.remove()
                            await self.mount_all(batch)
                        removed = True

                for block in self._parse_markdown(tokens):
                    batch.append(block)
                    if len(batch) == BATCH_SIZE:
                        await mount_batch(batch)
                        batch.clear()
                if batch:
                    await mount_batch(batch)
                if not removed:
                    await markdown_block.remove()

            lines = markdown.splitlines()
            self._last_parsed_line = len(lines) - (1 if lines and lines[-1] else 0)
            self.post_message(
                Markdown.TableOfContentsUpdated(
                    self, self.table_of_contents
                ).set_sender(self)
            )

        return AwaitComplete(await_update())

    def append(self, markdown: str) -> AwaitComplete:
        """Append to markdown.

        Args:
            markdown: A fragment of markdown to be appended.

        Returns:
            An optionally awaitable object. Await this to ensure that the markdown has been append by the next line.
        """
        parser = (
            MarkdownIt("gfm-like")
            if self._parser_factory is None
            else self._parser_factory()
        )

        self._markdown = self.source + markdown
        updated_source = "".join(
            self._markdown.splitlines(keepends=True)[self._last_parsed_line :]
        )

        async def await_append() -> None:
            """Append new markdown widgets."""
            async with self.lock:
                tokens = parser.parse(updated_source)
                existing_blocks = [
                    child for child in self.children if isinstance(child, MarkdownBlock)
                ]
                start_line = self._last_parsed_line
                for token in reversed(tokens):
                    if token.map is not None and token.level == 0:
                        self._last_parsed_line += token.map[0]
                        break

                new_blocks = list(self._parse_markdown(tokens))
                any_headers = any(
                    isinstance(block, MarkdownHeader) for block in new_blocks
                )
                for block in new_blocks:
                    start, end = block.source_range
                    block.source_range = (
                        start + start_line,
                        end + start_line,
                    )

                with self.app.batch_update():
                    if existing_blocks and new_blocks:
                        last_block = existing_blocks[-1]
                        last_block.source_range = new_blocks[0].source_range
                        try:
                            await last_block._update_from_block(new_blocks[0])
                        except IndexError:
                            pass
                        else:
                            new_blocks = new_blocks[1:]

                    if new_blocks:
                        await self.mount_all(new_blocks)

                if any_headers:
                    self._table_of_contents = None
                    self.post_message(
                        Markdown.TableOfContentsUpdated(
                            self, self.table_of_contents
                        ).set_sender(self)
                    )

        return AwaitComplete(await_append())


class MarkdownTableOfContents(Widget, can_focus_children=True):
    """Displays a table of contents for a markdown document."""

    DEFAULT_CSS = """
    MarkdownTableOfContents {
        width: auto;
        height: 1fr;
        background: $panel;
        &:focus-within {
            background-tint: $foreground 5%;
        }
    }
    MarkdownTableOfContents > Tree {
        padding: 1;
        width: auto;
        height: 1fr;
        background: $panel;
    }
    """

    table_of_contents = reactive[Optional[TableOfContentsType]](None, init=False)
    """Underlying data to populate the table of contents widget."""

    def __init__(
        self,
        markdown: Markdown,
        name: str | None = None,
        id: str | None = None,
        classes: str | None = None,
        disabled: bool = False,
    ) -> None:
        """Initialize a table of contents.

        Args:
            markdown: The Markdown document associated with this table of contents.
            name: The name of the widget.
            id: The ID of the widget in the DOM.
            classes: The CSS classes for the widget.
            disabled: Whether the widget is disabled or not.
        """
        self.markdown: Markdown = markdown
        """The Markdown document associated with this table of contents."""
        super().__init__(name=name, id=id, classes=classes, disabled=disabled)

    def compose(self) -> ComposeResult:
        tree: Tree = Tree("TOC")
        tree.show_root = False
        tree.show_guides = True
        tree.guide_depth = 4
        tree.auto_expand = False
        yield tree

    def watch_table_of_contents(self, table_of_contents: TableOfContentsType) -> None:
        """Triggered when the table of contents changes."""
        self.rebuild_table_of_contents(table_of_contents)

    def rebuild_table_of_contents(self, table_of_contents: TableOfContentsType) -> None:
        """Rebuilds the tree representation of the table of contents data.

        Args:
            table_of_contents: Table of contents.
        """
        tree = self.query_one(Tree)
        tree.clear()
        root = tree.root
        for level, name, block_id in table_of_contents:
            node = root
            for _ in range(level - 1):
                if node._children:
                    node = node._children[-1]
                    node.expand()
                    node.allow_expand = True
                else:
                    node = node.add(NUMERALS[level], expand=True)
            node_label = Text.assemble((f"{NUMERALS[level]} ", "dim"), name)
            node.add_leaf(node_label, {"block_id": block_id})

    async def _on_tree_node_selected(self, message: Tree.NodeSelected) -> None:
        node_data = message.node.data
        if node_data is not None:
            await self._post_message(
                Markdown.TableOfContentsSelected(self.markdown, node_data["block_id"])
            )
        message.stop()


class MarkdownViewer(VerticalScroll, can_focus=False, can_focus_children=True):
    """A Markdown viewer widget."""

    SCOPED_CSS = False

    DEFAULT_CSS = """
    MarkdownViewer {
        height: 1fr;
        scrollbar-gutter: stable;
        background: $surface;
        & > MarkdownTableOfContents {
            display: none;
            dock:left;
        }
    }

    MarkdownViewer.-show-table-of-contents > MarkdownTableOfContents {
        display: block;
    }
    """

    show_table_of_contents = reactive(True)
    """Show the table of contents?"""
    top_block = reactive("")

    navigator: var[Navigator] = var(Navigator)

    class NavigatorUpdated(Message):
        """Navigator has been changed (clicked link etc)."""

    def __init__(
        self,
        markdown: str | None = None,
        *,
        show_table_of_contents: bool = True,
        name: str | None = None,
        id: str | None = None,
        classes: str | None = None,
        parser_factory: Callable[[], MarkdownIt] | None = None,
        open_links: bool = True,
    ):
        """Create a Markdown Viewer object.

        Args:
            markdown: String containing Markdown, or None to leave blank.
            show_table_of_contents: Show a table of contents in a sidebar.
            name: The name of the widget.
            id: The ID of the widget in the DOM.
            classes: The CSS classes of the widget.
            parser_factory: A factory function to return a configured MarkdownIt instance. If `None`, a "gfm-like" parser is used.
            open_links: Open links automatically. If you set this to `False`, you can handle the [`LinkClicked`][textual.widgets.markdown.Markdown.LinkClicked] events.
        """
        super().__init__(name=name, id=id, classes=classes)
        self.show_table_of_contents = show_table_of_contents
        self._markdown = markdown
        self._parser_factory = parser_factory
        self._open_links = open_links

    @property
    def document(self) -> Markdown:
        """The [`Markdown`][textual.widgets.Markdown] document widget."""
        return self.query_one(Markdown)

    @property
    def table_of_contents(self) -> MarkdownTableOfContents:
        """The [table of contents][textual.widgets.markdown.MarkdownTableOfContents] widget."""
        return self.query_one(MarkdownTableOfContents)

    async def _on_mount(self, _: Mount) -> None:
        await self.document.update(self._markdown or "")

    async def go(self, location: str | PurePath) -> None:
        """Navigate to a new document path."""
        path, anchor = self.document.sanitize_location(str(location))
        if path == Path(".") and anchor:
            # We've been asked to go to an anchor but with no file specified.
            self.document.goto_anchor(anchor)
        else:
            # We've been asked to go to a file, optionally with an anchor.
            await self.document.load(self.navigator.go(location))
            self.post_message(self.NavigatorUpdated())

    async def back(self) -> None:
        """Go back one level in the history."""
        if self.navigator.back():
            await self.document.load(self.navigator.location)
            self.post_message(self.NavigatorUpdated())

    async def forward(self) -> None:
        """Go forward one level in the history."""
        if self.navigator.forward():
            await self.document.load(self.navigator.location)
            self.post_message(self.NavigatorUpdated())

    async def _on_markdown_link_clicked(self, message: Markdown.LinkClicked) -> None:
        message.stop()
        await self.go(message.href)

    def watch_show_table_of_contents(self, show_table_of_contents: bool) -> None:
        self.set_class(show_table_of_contents, "-show-table-of-contents")

    def compose(self) -> ComposeResult:
        markdown = Markdown(
            parser_factory=self._parser_factory, open_links=self._open_links
        )
        markdown.can_focus = True
        yield markdown
        yield MarkdownTableOfContents(markdown)

    def _on_markdown_table_of_contents_updated(
        self, message: Markdown.TableOfContentsUpdated
    ) -> None:
        self.query_one(MarkdownTableOfContents).table_of_contents = (
            message.table_of_contents
        )
        message.stop()

    def _on_markdown_table_of_contents_selected(
        self, message: Markdown.TableOfContentsSelected
    ) -> None:
        block_selector = f"#{message.block_id}"
        block = self.query_one(block_selector, MarkdownBlock)
        self.scroll_to_widget(block, top=True)
        message.stop()
