Newer
Older
navi-1 / navi / core / container.py
"""Application container — holds all shared singletons with explicit lifecycle."""

from __future__ import annotations

import structlog
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from navi.config import settings

logger = structlog.get_logger()
from navi.context_providers._loader import ContextProviderRegistry
from navi.llm.ollama import OllamaBackend

if TYPE_CHECKING:
    from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry
    from navi.core.orchestrator import AgentSessionOrchestrator
    from navi.core.scheduler import RecallScheduler
    from navi.db import Database
    from navi.memory import MemoryStore
    from navi.mcp import McpManager
    from navi.store import KvStore
    from navi.workers.base import Worker


@dataclass
class AppContainer:
    """Holds all application-level singletons created at startup."""

    database: "Database | None" = None
    memory_store: "MemoryStore" = None  # type: ignore[assignment]
    session_store: "SessionStore" = None  # type: ignore[assignment]
    kv_store: "KvStore" = None  # type: ignore[assignment]
    scheduler: "RecallScheduler" = None  # type: ignore[assignment]
    tool_registry: "ToolRegistry" = None  # type: ignore[assignment]
    profile_registry: "ProfileRegistry" = None  # type: ignore[assignment]
    backend_registry: "BackendRegistry" = None  # type: ignore[assignment]
    cp_registry: "ContextProviderRegistry" = None  # type: ignore[assignment]
    workers: list["Worker"] = field(default_factory=list)
    mcp_manager: "McpManager | None" = None
    terminal_manager: "TerminalManager | None" = None
    orchestrator: "AgentSessionOrchestrator | None" = None

    _agent: "Agent | None" = field(default=None, repr=False)

    def get_agent(self) -> "Agent":
        from navi.core import Agent

        if self._agent is None:
            self._agent = Agent(
                session_store=self.session_store,
                profile_registry=self.profile_registry,
                tool_registry=self.tool_registry,
                backend_registry=self.backend_registry,
                workers=self.workers,
                memory_store=self.memory_store,
                cp_registry=self.cp_registry,
                mcp_manager=self.mcp_manager,
            )
        return self._agent

    async def shutdown(self) -> None:
        """Close all resources that need explicit cleanup."""
        if self.mcp_manager is not None:
            try:
                await self.mcp_manager.disconnect_all()
            except BaseException:
                pass
        if self.database is not None:
            try:
                await self.database.close()
            except BaseException:
                pass


async def create_container() -> AppContainer:
    """Build the full application container."""
    from navi.core.registry import build_default_registries
    from navi.core.scheduler import RecallScheduler
    from navi.core.pg_session_store import PgSessionStore
    from navi.db import Database
    from navi.memory import MemoryStore
    from navi.mcp import McpManager, load_mcp_servers
    from navi.mcp.tools import McpTool
    from navi.store import KvStore
    from navi.workers import build_default_workers

    if not settings.database_url:
        raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.")

    database = Database(settings.database_url)
    pool = await database.pool()

    session_store = PgSessionStore(pool)
    memory_store = MemoryStore(pool)
    kv_store = KvStore(pool)
    scheduler = RecallScheduler(pool)

    # Create MCP manager early so build_default_registries can wire it in
    mcp_manager = McpManager()
    await mcp_manager.load_all()

    from navi.tools.terminal_manager import TerminalManager
    terminal_manager = TerminalManager()
    terminal_manager.start()

    tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries(
        memory_store=memory_store,
        session_store=session_store,
        scheduler=scheduler,
        kv_store=kv_store,
        mcp_manager=mcp_manager,
        terminal_manager=terminal_manager,
    )

    # Wire embedding backend into memory store
    try:
        if settings.embedding_ollama_host:
            emb_backend = OllamaBackend(
                model=settings.embedding_model,
                host=settings.embedding_ollama_host,
                api_key=settings.embedding_ollama_api_key,
                timeout=settings.ollama_request_timeout,
            )
        else:
            emb_backend = backend_registry.get("ollama")
        if hasattr(memory_store, "set_embedding_backend"):
            memory_store.set_embedding_backend(emb_backend)
    except Exception:
        pass

    workers = build_default_workers()

    # Register MCP tools for servers that connected during startup.
    # This must happen AFTER build_default_registries creates tool_registry.
    for srv_name, client in mcp_manager.clients.items():
        if not client.connected:
            continue
        try:
            tools = await client.list_tools()
            logger.info(
                "Registering startup MCP tools for %r -> %d tools",
                srv_name, len(tools),
            )
            for tool in tools:
                mcp_tool = McpTool(
                    server_name=srv_name,
                    tool_name=tool.name,
                    description=tool.description or "",
                    parameters=tool.inputSchema,
                    manager=mcp_manager,
                )
                tool_registry.register_external(mcp_tool)
        except Exception as exc:
            logger.warning("Failed to register MCP tools for %r: %s", srv_name, exc)

    # Callback for health-check reconnects (server comes back online later)
    async def _on_mcp_server_connected(server_name: str) -> None:
        from navi.core.event_bus import get_event_bus
        from navi.core.events import McpStatusUpdate
        client = mcp_manager.clients.get(server_name)
        if not client:
            logger.warning("_on_mcp_server_connected: no client for %r", server_name)
            return
        try:
            tools = await client.list_tools()
            logger.info(
                "_on_mcp_server_connected: %r -> %d tools",
                server_name, len(tools),
            )
            for tool in tools:
                mcp_tool = McpTool(
                    server_name=server_name,
                    tool_name=tool.name,
                    description=tool.description or "",
                    parameters=tool.inputSchema,
                    manager=mcp_manager,
                )
                tool_registry.register_external(mcp_tool)
                logger.debug(
                    "Registered MCP tool: %s", mcp_tool.name,
                )
            await get_event_bus().publish(
                McpStatusUpdate(
                    server_name=server_name,
                    status="connected",
                    tool_count=len(tools),
                )
            )
        except Exception as exc:
            logger.warning("Failed to register MCP tools for %r: %s", server_name, exc)

    mcp_manager.set_on_server_connected(_on_mcp_server_connected)
    mcp_manager.start_health_check()

    container = AppContainer(
        database=database,
        memory_store=memory_store,
        session_store=session_store,
        kv_store=kv_store,
        scheduler=scheduler,
        tool_registry=tool_registry,
        profile_registry=profile_registry,
        backend_registry=backend_registry,
        cp_registry=cp_registry,
        workers=workers,
        mcp_manager=mcp_manager,
        terminal_manager=terminal_manager,
    )
    from navi.core.orchestrator import AgentSessionOrchestrator

    container.orchestrator = AgentSessionOrchestrator(container)
    return container