Newer
Older
navi-1 / navi / mcp / ui_server.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 9 days ago 6 KB Enable navi_ui card_grid for realtor profile
"""Internal MCP server that lets Navi push UI components to the webclient.

The server exposes a single tool, ``render_component``.  When Navi calls it,
the payload is forwarded over the active WebSocket for the session as a
``ui_component`` event.  The webclient receives the event and renders the
named component with the supplied data.

This server is started by ``navi.main:lifespan`` on a dedicated port
(``NAVI_UI_MCP_PORT``, default 8001) before the main container is built so
that :class:`navi.mcp.McpManager` can connect to it during startup like any
other MCP server.
"""

from __future__ import annotations

import asyncio
import logging
from typing import Any

import structlog
from mcp.server import FastMCP

logger = logging.getLogger(__name__)

# Shared holder: the lifespan task sets the orchestrator once the main
# FastAPI container is ready.  The tool waits (with a timeout) so that early
# MCP health-check pings or tool calls do not crash before startup completes.
_orchestrator: Any | None = None
_orchestrator_ready = asyncio.Event()

_ORCHESTRATOR_TIMEOUT = 10.0

# Component schema documentation embedded into server instructions.
# This is separate from the webclient implementation; both must agree on the
# same contract, but the server only validates the envelope.
SERVER_INSTRUCTIONS = """\
Internal Navi UI server. Use this to render structured data in the webclient.

Tool: render_component(component_name, payload, session_id)
- component_name: identifier of the UI component to render.
- payload: JSON-serializable object with the data for that component.
- session_id: target Navi session (injected automatically by the agent).

Only call this when the user explicitly asks for a graphical / structured
view, or when a tool response is naturally represented as a component.

## Supported components

### card_grid
A responsive grid of clickable cards (default up to 4). Clicking a card opens a
modal with the full `details` object.

Payload shape:
```json
{
  "title": "Optional section heading",
  "cards": [
    {
      "id": "unique-card-id",
      "title": "Card title",
      "subtitle": "Short secondary line",
      "image": "https://optional-image-url",
      "meta": [
        {"label": "Label", "value": "Value"}
      ],
      "description": "One-line summary (optional)",
      "details": [
        {"label": "Full detail", "value": "Full value"}
      ],
      "actions": [
        {"label": "Open", "url": "https://..."}
      ]
    }
  ]
}
```

Rules:
- `cards` is required and must be a list.
- Each card must contain `id` (string) and `title` (string).
- `image` must be a direct URL if present.
- Keep `description` under 120 characters.
- Put everything the user might want to see after clicking in `details`.
"""

mcp = FastMCP(
    "navi_ui",
    host="127.0.0.1",
    port=8001,
    streamable_http_path="/mcp",
    instructions=SERVER_INSTRUCTIONS,
)


def _is_valid_card_grid(payload: dict[str, Any]) -> tuple[bool, str]:
    """Lightweight structural validation for the card_grid component.

    Returns (ok, error_message).  Does not validate URL reachability.
    """
    cards = payload.get("cards")
    if not isinstance(cards, list):
        return False, "card_grid payload must contain a 'cards' list"
    if len(cards) == 0:
        return False, "card_grid 'cards' list must not be empty"
    for idx, card in enumerate(cards):
        if not isinstance(card, dict):
            return False, f"card at index {idx} must be an object"
        if not isinstance(card.get("id"), str) or not card["id"]:
            return False, f"card at index {idx} must have a non-empty string 'id'"
        if not isinstance(card.get("title"), str) or not card["title"]:
            return False, f"card at index {idx} must have a non-empty string 'title'"
    return True, ""


@mcp.tool()
async def render_component(
    component_name: str,
    payload: dict[str, Any],
    session_id: str | None = None,
) -> str:
    """Render a UI component in the webclient for the given session.

    Args:
        component_name: Identifier of the registered UI component (e.g. "table").
        payload: JSON-serializable data object consumed by that component.
        session_id: Navi session id. Injected automatically by the agent context.

    Returns:
        A short confirmation string, or an error message if forwarding failed.
    """
    if not component_name or not isinstance(component_name, str):
        return "Error: component_name must be a non-empty string"
    if not isinstance(payload, dict):
        return "Error: payload must be a JSON object (dict)"
    if not session_id:
        return "Error: session_id is required (it is normally injected by the agent)"

    if component_name == "card_grid":
        ok, error = _is_valid_card_grid(payload)
        if not ok:
            return f"Error: {error}"

    try:
        await asyncio.wait_for(
            _orchestrator_ready.wait(),
            timeout=_ORCHESTRATOR_TIMEOUT,
        )
    except asyncio.TimeoutError:
        return "Error: UI server is not ready (orchestrator unavailable)"

    orchestrator = _orchestrator
    if orchestrator is None:
        return "Error: UI server orchestrator reference is missing"

    try:
        await orchestrator._notify_session(
            session_id,
            {
                "type": "ui_component",
                "component": component_name,
                "payload": payload,
            },
        )
    except Exception as exc:
        logger.warning("render_component failed for session %s: %s", session_id, exc)
        return f"Error: failed to send component to session: {exc}"

    return f"Component {component_name!r} rendered for session {session_id}"


def set_orchestrator(orchestrator: Any) -> None:
    """Wire the active orchestrator so the tool can reach WebSocket clients."""
    global _orchestrator
    _orchestrator = orchestrator
    _orchestrator_ready.set()
    logger.info("navi_ui orchestrator wired")


def clear_orchestrator() -> None:
    """Clear the orchestrator reference, e.g. during graceful shutdown."""
    global _orchestrator
    _orchestrator = None
    _orchestrator_ready.clear()


async def start_ui_server(host: str, port: int) -> None:
    """Start the StreamableHTTP MCP server and run forever.

    This coroutine is intended to be scheduled as a background task from the
    main application lifespan.  It only returns when the server is stopped.
    """
    log = structlog.get_logger()
    log.info("navi_ui_mcp_server_starting", host=host, port=port)
    # FastMCP copies constructor args into its settings object, so update them
    # explicitly to honour runtime configuration.
    mcp.settings.host = host
    mcp.settings.port = port
    await mcp.run_streamable_http_async()