Newer
Older
navi-1 / navi / core / container.py
"""Application container — holds all shared singletons with explicit lifecycle."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from navi.config import settings
from navi.context_providers._loader import ContextProviderRegistry
from navi.llm.ollama import OllamaBackend

if TYPE_CHECKING:
    from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry
    from navi.core.orchestrator import AgentSessionOrchestrator
    from navi.core.scheduler import RecallScheduler
    from navi.db import Database
    from navi.memory import MemoryStore
    from navi.mcp import McpManager
    from navi.store import KvStore
    from navi.workers.base import Worker


@dataclass
class AppContainer:
    """Holds all application-level singletons created at startup."""

    database: "Database | None" = None
    memory_store: "MemoryStore" = None  # type: ignore[assignment]
    session_store: "SessionStore" = None  # type: ignore[assignment]
    kv_store: "KvStore" = None  # type: ignore[assignment]
    scheduler: "RecallScheduler" = None  # type: ignore[assignment]
    tool_registry: "ToolRegistry" = None  # type: ignore[assignment]
    profile_registry: "ProfileRegistry" = None  # type: ignore[assignment]
    backend_registry: "BackendRegistry" = None  # type: ignore[assignment]
    cp_registry: "ContextProviderRegistry" = None  # type: ignore[assignment]
    workers: list["Worker"] = field(default_factory=list)
    mcp_manager: "McpManager | None" = None
    orchestrator: "AgentSessionOrchestrator | None" = None

    _agent: "Agent | None" = field(default=None, repr=False)

    def get_agent(self) -> "Agent":
        from navi.core import Agent

        if self._agent is None:
            self._agent = Agent(
                session_store=self.session_store,
                profile_registry=self.profile_registry,
                tool_registry=self.tool_registry,
                backend_registry=self.backend_registry,
                workers=self.workers,
                memory_store=self.memory_store,
                cp_registry=self.cp_registry,
                mcp_manager=self.mcp_manager,
            )
        return self._agent

    async def shutdown(self) -> None:
        """Close all resources that need explicit cleanup."""
        if self.mcp_manager is not None:
            try:
                await self.mcp_manager.disconnect_all()
            except Exception:
                pass
        if self.database is not None:
            try:
                await self.database.close()
            except Exception:
                pass


async def create_container() -> AppContainer:
    """Build the full application container."""
    from navi.core.registry import build_default_registries
    from navi.core.scheduler import RecallScheduler
    from navi.core.pg_session_store import PgSessionStore
    from navi.db import Database
    from navi.memory import MemoryStore
    from navi.mcp import McpManager, load_mcp_servers
    from navi.mcp.tools import McpTool
    from navi.store import KvStore
    from navi.workers import build_default_workers

    if not settings.database_url:
        raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.")

    database = Database(settings.database_url)
    pool = await database.pool()

    session_store = PgSessionStore(pool)
    memory_store = MemoryStore(pool)
    kv_store = KvStore(pool)
    scheduler = RecallScheduler(pool)

    tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries(
        memory_store=memory_store,
        session_store=session_store,
        scheduler=scheduler,
        kv_store=kv_store,
    )

    # Wire embedding backend into memory store
    try:
        if settings.embedding_ollama_host:
            emb_backend = OllamaBackend(
                model=settings.embedding_model,
                host=settings.embedding_ollama_host,
                api_key=settings.embedding_ollama_api_key,
                timeout=settings.ollama_request_timeout,
            )
        else:
            emb_backend = backend_registry.get("ollama")
        if hasattr(memory_store, "set_embedding_backend"):
            memory_store.set_embedding_backend(emb_backend)
    except Exception:
        pass

    workers = build_default_workers()

    mcp_manager = McpManager()
    await mcp_manager.load_all()

    # Register MCP tools as external
    try:
        tools = await mcp_manager.get_all_tools()
        for server_name, tool in tools:
            mcp_tool = McpTool(
                server_name=server_name,
                tool_name=tool.name,
                description=tool.description or "",
                parameters=tool.inputSchema,
                manager=mcp_manager,
            )
            tool_registry.register_external(mcp_tool)
        for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"):
            tool = tool_registry.get(tool_name)
            if hasattr(tool, "_mcp_manager"):
                tool._mcp_manager = mcp_manager
    except Exception:
        pass

    container = AppContainer(
        database=database,
        memory_store=memory_store,
        session_store=session_store,
        kv_store=kv_store,
        scheduler=scheduler,
        tool_registry=tool_registry,
        profile_registry=profile_registry,
        backend_registry=backend_registry,
        cp_registry=cp_registry,
        workers=workers,
        mcp_manager=mcp_manager,
    )
    from navi.core.orchestrator import AgentSessionOrchestrator

    container.orchestrator = AgentSessionOrchestrator(container)
    return container