diff --git a/mcp_servers.d/navi_ui.json b/mcp_servers.d/navi_ui.json new file mode 100644 index 0000000..912c00d --- /dev/null +++ b/mcp_servers.d/navi_ui.json @@ -0,0 +1,8 @@ +{ + "transport": "streamable_http", + "url": "http://localhost:8001/mcp", + "groups": { + "ui": ["render_component"] + }, + "instructions": "Internal Navi UI server. Use `render_component` to display structured data in the webclient. The tool accepts `component_name` (string) and `payload` (JSON object); `session_id` is injected automatically by the agent. Only use it when a graphical/structured view is appropriate." +} diff --git a/navi/config.py b/navi/config.py index d6430dc..73657c1 100644 --- a/navi/config.py +++ b/navi/config.py @@ -68,6 +68,11 @@ # Directory for user-defined context providers (auto-discovered at startup) context_providers_dir: str = "context_providers" + # Internal MCP UI server — lets Navi render structured components in the webclient. + navi_ui_mcp_enabled: bool = True + navi_ui_mcp_host: str = "127.0.0.1" + navi_ui_mcp_port: int = 8001 + # Session file uploads session_files_dir: str = "session_files" session_files_max_size_mb: int = 200 diff --git a/navi/main.py b/navi/main.py index 7bd3ac1..be03f2b 100644 --- a/navi/main.py +++ b/navi/main.py @@ -9,7 +9,7 @@ import structlog from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from navi.api.routes import agents, api_tokens, auth, health, messages, sessions, webhooks @@ -31,12 +31,60 @@ _base = Path(__file__).parent.parent +async def _wait_for_ui_server(host: str, port: int, timeout: float = 5.0) -> bool: + """Poll until the internal UI MCP server is accepting TCP connections.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout=0.5 + ) + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + return True + except Exception: + await asyncio.sleep(0.1) + return False + + @asynccontextmanager async def lifespan(app: FastAPI): log = structlog.get_logger() + + ui_server_task: asyncio.Task | None = None + if settings.navi_ui_mcp_enabled: + from navi.mcp.ui_server import start_ui_server + + ui_server_task = asyncio.create_task( + start_ui_server(settings.navi_ui_mcp_host, settings.navi_ui_mcp_port) + ) + ready = await _wait_for_ui_server( + settings.navi_ui_mcp_host, settings.navi_ui_mcp_port + ) + if ready: + log.info( + "startup.navi_ui_mcp_ready", + host=settings.navi_ui_mcp_host, + port=settings.navi_ui_mcp_port, + ) + else: + log.warning( + "startup.navi_ui_mcp_not_ready", + host=settings.navi_ui_mcp_host, + port=settings.navi_ui_mcp_port, + ) + container = await create_container() app.state.container = container + if settings.navi_ui_mcp_enabled and container.orchestrator is not None: + from navi.mcp.ui_server import set_orchestrator + + set_orchestrator(container.orchestrator) + from navi.api.deps import set_container set_container(container) @@ -103,6 +151,16 @@ except asyncio.CancelledError: pass + if settings.navi_ui_mcp_enabled and ui_server_task is not None: + from navi.mcp.ui_server import clear_orchestrator + + clear_orchestrator() + ui_server_task.cancel() + try: + await ui_server_task + except asyncio.CancelledError: + pass + from navi.tools.ssh_exec import close_all_connections close_all_connections() await container.shutdown() @@ -140,9 +198,6 @@ app.mount("/content", StaticFiles(directory=str(_base / "navi" / "content")), name="content") -from fastapi.responses import RedirectResponse - - @app.get("/api/sessions/{session_id}/files/{filename}", include_in_schema=False) async def api_session_file_redirect(session_id: str, filename: str, download: bool = False): target = f"/sessions/{session_id}/files/{filename}" diff --git a/navi/mcp/ui_server.py b/navi/mcp/ui_server.py new file mode 100644 index 0000000..11b1016 --- /dev/null +++ b/navi/mcp/ui_server.py @@ -0,0 +1,130 @@ +"""Internal MCP server that lets Navi push UI components to the webclient. + +The server exposes a single tool, ``render_component``. When Navi calls it, +the payload is forwarded over the active WebSocket for the session as a +``ui_component`` event. The webclient receives the event and renders the +named component with the supplied data. + +This server is started by ``navi.main:lifespan`` on a dedicated port +(``NAVI_UI_MCP_PORT``, default 8001) before the main container is built so +that :class:`navi.mcp.McpManager` can connect to it during startup like any +other MCP server. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import structlog +from mcp.server import FastMCP + +logger = logging.getLogger(__name__) + +# Shared holder: the lifespan task sets the orchestrator once the main +# FastAPI container is ready. The tool waits (with a timeout) so that early +# MCP health-check pings or tool calls do not crash before startup completes. +_orchestrator: Any | None = None +_orchestrator_ready = asyncio.Event() + +_ORCHESTRATOR_TIMEOUT = 10.0 + +mcp = FastMCP( + "navi_ui", + host="127.0.0.1", + port=8001, + streamable_http_path="/mcp", + instructions="""\ +Internal Navi UI server. Use this to render structured data in the webclient. + +Tool: render_component(component_name, payload, session_id) +- component_name: identifier of the UI component to render. +- payload: JSON-serializable object with the data for that component. +- session_id: target Navi session (injected automatically by the agent). + +Only call this when the user explicitly asks for a graphical / structured +view, or when a tool response is naturally represented as a component. +""", +) + + +@mcp.tool() +async def render_component( + component_name: str, + payload: dict[str, Any], + session_id: str | None = None, +) -> str: + """Render a UI component in the webclient for the given session. + + Args: + component_name: Identifier of the registered UI component (e.g. "table"). + payload: JSON-serializable data object consumed by that component. + session_id: Navi session id. Injected automatically by the agent context. + + Returns: + A short confirmation string, or an error message if forwarding failed. + """ + if not component_name or not isinstance(component_name, str): + return "Error: component_name must be a non-empty string" + if not isinstance(payload, dict): + return "Error: payload must be a JSON object (dict)" + if not session_id: + return "Error: session_id is required (it is normally injected by the agent)" + + try: + await asyncio.wait_for( + _orchestrator_ready.wait(), + timeout=_ORCHESTRATOR_TIMEOUT, + ) + except asyncio.TimeoutError: + return "Error: UI server is not ready (orchestrator unavailable)" + + orchestrator = _orchestrator + if orchestrator is None: + return "Error: UI server orchestrator reference is missing" + + try: + await orchestrator._notify_session( + session_id, + { + "type": "ui_component", + "component": component_name, + "payload": payload, + }, + ) + except Exception as exc: + logger.warning("render_component failed for session %s: %s", session_id, exc) + return f"Error: failed to send component to session: {exc}" + + return f"Component {component_name!r} rendered for session {session_id}" + + +def set_orchestrator(orchestrator: Any) -> None: + """Wire the active orchestrator so the tool can reach WebSocket clients.""" + global _orchestrator + _orchestrator = orchestrator + _orchestrator_ready.set() + logger.info("navi_ui orchestrator wired") + + +def clear_orchestrator() -> None: + """Clear the orchestrator reference, e.g. during graceful shutdown.""" + global _orchestrator + _orchestrator = None + _orchestrator_ready.clear() + + +async def start_ui_server(host: str, port: int) -> None: + """Start the StreamableHTTP MCP server and run forever. + + This coroutine is intended to be scheduled as a background task from the + main application lifespan. It only returns when the server is stopped. + """ + log = structlog.get_logger() + log.info("navi_ui_mcp_server_starting", host=host, port=port) + # FastMCP copies constructor args into its settings object, so update them + # explicitly to honour runtime configuration. + mcp.settings.host = host + mcp.settings.port = port + await mcp.run_streamable_http_async() diff --git a/tests/unit/mcp/test_ui_server.py b/tests/unit/mcp/test_ui_server.py new file mode 100644 index 0000000..59b2717 --- /dev/null +++ b/tests/unit/mcp/test_ui_server.py @@ -0,0 +1,81 @@ +"""Unit tests for the internal navi_ui MCP server.""" + +import pytest +from unittest.mock import AsyncMock + +from navi.mcp import ui_server + + +@pytest.fixture(autouse=True) +def reset_ui_server_state(): + """Reset global orchestrator holder before/after every test.""" + ui_server.clear_orchestrator() + yield + ui_server.clear_orchestrator() + + +class TestRenderComponent: + async def test_requires_component_name(self): + result = await ui_server.render_component("", {"x": 1}, "s1") + assert "component_name" in result.lower() + assert "Error" in result + + async def test_requires_dict_payload(self): + result = await ui_server.render_component("table", [1, 2, 3], "s1") + assert "payload must be a JSON object" in result + + async def test_requires_session_id(self): + result = await ui_server.render_component("table", {"x": 1}, None) + assert "session_id" in result.lower() + assert "Error" in result + + async def test_sends_event_when_orchestrator_ready(self): + mock_orchestrator = AsyncMock() + mock_orchestrator._notify_session = AsyncMock() + ui_server.set_orchestrator(mock_orchestrator) + + result = await ui_server.render_component("table", {"rows": 3}, "sess-123") + + assert "table" in result + assert "sess-123" in result + assert "Error" not in result + mock_orchestrator._notify_session.assert_awaited_once_with( + "sess-123", + { + "type": "ui_component", + "component": "table", + "payload": {"rows": 3}, + }, + ) + + async def test_times_out_when_orchestrator_never_set(self): + original_timeout = ui_server._ORCHESTRATOR_TIMEOUT + ui_server._ORCHESTRATOR_TIMEOUT = 0.05 + try: + result = await ui_server.render_component("table", {"x": 1}, "s1") + assert "not ready" in result.lower() or "unavailable" in result.lower() + assert "Error" in result + finally: + ui_server._ORCHESTRATOR_TIMEOUT = original_timeout + + async def test_returns_error_when_notify_session_fails(self): + mock_orchestrator = AsyncMock() + mock_orchestrator._notify_session.side_effect = RuntimeError("boom") + ui_server.set_orchestrator(mock_orchestrator) + + result = await ui_server.render_component("table", {"x": 1}, "s1") + + assert "failed to send" in result.lower() + assert "Error" in result + + +class TestOrchestratorHolder: + def test_set_and_clear(self): + mock_orchestrator = AsyncMock() + ui_server.set_orchestrator(mock_orchestrator) + assert ui_server._orchestrator is mock_orchestrator + assert ui_server._orchestrator_ready.is_set() + + ui_server.clear_orchestrator() + assert ui_server._orchestrator is None + assert not ui_server._orchestrator_ready.is_set() diff --git a/webclient/src/components/messages/AssistantMessage.vue b/webclient/src/components/messages/AssistantMessage.vue index 1687154..446046e 100644 --- a/webclient/src/components/messages/AssistantMessage.vue +++ b/webclient/src/components/messages/AssistantMessage.vue @@ -27,6 +27,7 @@ :default-open="contentToolKey(msg, entry.i) === latestContentToolKey" :collapse-token="latestContentToolKey" /> +
@@ -105,6 +106,7 @@ import ThinkingCard from './ThinkingCard.vue' import ToolCard from './ToolCard.vue' import ContentCard from './ContentCard.vue' +import UiComponentCard from './UiComponentCard.vue' import { GnIconButton } from 'gnexus-ui-kit/vue' import { renderMarkdown, attachCopyButtons, attachImageLightbox } from '@/composables/useMarkdown.js' import { useTimeLabel } from '@/composables/useTime.js' @@ -200,6 +202,7 @@ } function isCountableTool(item) { + if (item.kind === 'ui_component') return false return item.kind === 'tool' && item.name !== 'content_publish' } diff --git a/webclient/src/components/messages/UiComponentCard.vue b/webclient/src/components/messages/UiComponentCard.vue new file mode 100644 index 0000000..89da18b --- /dev/null +++ b/webclient/src/components/messages/UiComponentCard.vue @@ -0,0 +1,98 @@ + + + + + diff --git a/webclient/src/composables/useWebSocket.js b/webclient/src/composables/useWebSocket.js index 7ef3964..56fc83e 100644 --- a/webclient/src/composables/useWebSocket.js +++ b/webclient/src/composables/useWebSocket.js @@ -154,6 +154,7 @@ case 'stream_delta': chat.onStreamDelta(event.delta ?? ''); break case 'stream_end': chat.onStreamEnd(event); break case 'stream_stopped': chat.onStreamStopped(); break + case 'ui_component': chat.onUiComponent(event); break case 'profile_switched': chat.onProfileSwitched(event); break case 'compression_started':chat.onCompressionStarted(event); break case 'context_compressed':chat.onContextCompressed(event); break diff --git a/webclient/src/stores/chat.js b/webclient/src/stores/chat.js index 1f708ca..939fa38 100644 --- a/webclient/src/stores/chat.js +++ b/webclient/src/stores/chat.js @@ -507,6 +507,29 @@ msg.text += delta } + function onUiComponent(data) { + const msg = streamingMsg.value + if (!msg) { + // Component arrived outside an active stream — create a standalone message. + messages.value.push({ + id: `ui_${Date.now()}`, + role: 'assistant', + type: 'ui_component', + component: data.component ?? '', + payload: data.payload ?? {}, + text: '', + tools: [], + time: new Date().toISOString(), + }) + return + } + msg.tools.push({ + kind: 'ui_component', + component: data.component ?? '', + payload: data.payload ?? {}, + }) + } + function onStreamEnd(data) { const msg = streamingMsg.value if (msg) { @@ -836,6 +859,7 @@ onStreamDelta, onStreamEnd, onStreamStopped, + onUiComponent, onProfileSwitched, onCompressionStarted, onContextCompressed, diff --git a/webclient/tests/unit/stores/chat.test.js b/webclient/tests/unit/stores/chat.test.js index 9a5f756..4eb6f1e 100644 --- a/webclient/tests/unit/stores/chat.test.js +++ b/webclient/tests/unit/stores/chat.test.js @@ -198,6 +198,22 @@ expect(card.success).toBe(true) }) + it('onUiComponent appends to streaming message', () => { + const store = useChatStore() + store.onStreamStart() + store.onUiComponent({ component: 'table', payload: { rows: 2 } }) + const msg = store.streamingMsg + expect(msg.tools).toHaveLength(1) + expect(msg.tools[0]).toMatchObject({ kind: 'ui_component', component: 'table', payload: { rows: 2 } }) + }) + + it('onUiComponent creates standalone message when not streaming', () => { + const store = useChatStore() + store.onUiComponent({ component: 'chart', payload: { value: 42 } }) + expect(store.messages).toHaveLength(1) + expect(store.messages[0]).toMatchObject({ role: 'assistant', type: 'ui_component', component: 'chart', payload: { value: 42 } }) + }) + it('onError pushes error message', () => { const store = useChatStore() store.onError({ message: 'boom' })