Newer
Older
navi-1 / navi / core / container.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 18 May 5 KB Fix PgSessionStore import in container.py
"""Application container — holds all shared singletons with explicit lifecycle."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from navi.config import settings
from navi.context_providers._loader import ContextProviderRegistry
from navi.llm.ollama import OllamaBackend

if TYPE_CHECKING:
    from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry
    from navi.core.scheduler import RecallScheduler
    from navi.memory import MemoryStore
    from navi.mcp import McpManager
    from navi.store import KvStore
    from navi.workers.base import Worker


@dataclass
class AppContainer:
    """Holds all application-level singletons created at startup."""

    memory_store: "MemoryStore"
    session_store: "SessionStore"
    kv_store: "KvStore"
    scheduler: "RecallScheduler"
    tool_registry: "ToolRegistry"
    profile_registry: "ProfileRegistry"
    backend_registry: "BackendRegistry"
    cp_registry: "ContextProviderRegistry"
    workers: list["Worker"]
    mcp_manager: "McpManager | None" = None

    _agent: "Agent | None" = field(default=None, repr=False)

    def get_agent(self) -> "Agent":
        from navi.core import Agent

        if self._agent is None:
            self._agent = Agent(
                session_store=self.session_store,
                profile_registry=self.profile_registry,
                tool_registry=self.tool_registry,
                backend_registry=self.backend_registry,
                workers=self.workers,
                memory_store=self.memory_store,
                cp_registry=self.cp_registry,
                mcp_manager=self.mcp_manager,
            )
        return self._agent

    async def shutdown(self) -> None:
        """Close all resources that need explicit cleanup."""
        # MCP
        if self.mcp_manager is not None:
            try:
                await self.mcp_manager.disconnect_all()
            except Exception:
                pass
        # Session store pool
        if hasattr(self.session_store, "_pool") and self.session_store._pool is not None:
            try:
                await self.session_store._pool.close()
            except Exception:
                pass
        # Memory store pool
        if hasattr(self.memory_store, "_pool") and self.memory_store._pool is not None:
            try:
                await self.memory_store._pool.close()
            except Exception:
                pass
        # KV store pool
        if hasattr(self.kv_store, "_pool") and self.kv_store._pool is not None:
            try:
                await self.kv_store._pool.close()
            except Exception:
                pass
        # Scheduler pool
        if hasattr(self.scheduler, "_pool") and self.scheduler._pool is not None:
            try:
                await self.scheduler._pool.close()
            except Exception:
                pass


async def create_container() -> AppContainer:
    """Build the full application container."""
    from navi.core.registry import build_default_registries
    from navi.core.scheduler import RecallScheduler
    from navi.core.pg_session_store import PgSessionStore
    from navi.memory import MemoryStore
    from navi.mcp import McpManager, load_mcp_servers
    from navi.mcp.tools import McpTool
    from navi.store import KvStore
    from navi.workers import build_default_workers

    if not settings.database_url:
        raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.")

    session_store = PgSessionStore(settings.database_url)
    memory_store = MemoryStore(settings.database_url)
    kv_store = KvStore(settings.database_url)
    scheduler = RecallScheduler(settings.database_url)

    tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries(
        memory_store=memory_store,
        session_store=session_store,
        scheduler=scheduler,
        kv_store=kv_store,
    )

    # Wire embedding backend into memory store
    try:
        if settings.embedding_ollama_host:
            emb_backend = OllamaBackend(
                model=settings.embedding_model,
                host=settings.embedding_ollama_host,
                api_key=settings.embedding_ollama_api_key,
                timeout=settings.ollama_request_timeout,
            )
        else:
            emb_backend = backend_registry.get("ollama")
        if hasattr(memory_store, "set_embedding_backend"):
            memory_store.set_embedding_backend(emb_backend)
    except Exception:
        pass

    workers = build_default_workers()

    mcp_manager = McpManager()
    await mcp_manager.load_all()

    # Register MCP tools as external
    try:
        tools = await mcp_manager.get_all_tools()
        for server_name, tool in tools:
            mcp_tool = McpTool(
                server_name=server_name,
                tool_name=tool.name,
                description=tool.description or "",
                parameters=tool.inputSchema,
                manager=mcp_manager,
            )
            tool_registry.register_external(mcp_tool)
        for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"):
            tool = tool_registry.get(tool_name)
            if hasattr(tool, "_mcp_manager"):
                tool._mcp_manager = mcp_manager
    except Exception:
        pass

    return AppContainer(
        memory_store=memory_store,
        session_store=session_store,
        kv_store=kv_store,
        scheduler=scheduler,
        tool_registry=tool_registry,
        profile_registry=profile_registry,
        backend_registry=backend_registry,
        cp_registry=cp_registry,
        workers=workers,
        mcp_manager=mcp_manager,
    )