"""Application container — holds all shared singletons with explicit lifecycle."""
from __future__ import annotations
import structlog
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from navi.config import settings
logger = structlog.get_logger()
from navi.context_providers._loader import ContextProviderRegistry
from navi.llm.ollama import OllamaBackend
if TYPE_CHECKING:
from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry
from navi.core.orchestrator import AgentSessionOrchestrator
from navi.core.scheduler import RecallScheduler
from navi.db import Database
from navi.memory import MemoryStore
from navi.mcp import McpManager
from navi.store import KvStore
from navi.workers.base import Worker
@dataclass
class AppContainer:
"""Holds all application-level singletons created at startup."""
database: "Database | None" = None
memory_store: "MemoryStore" = None # type: ignore[assignment]
session_store: "SessionStore" = None # type: ignore[assignment]
kv_store: "KvStore" = None # type: ignore[assignment]
scheduler: "RecallScheduler" = None # type: ignore[assignment]
tool_registry: "ToolRegistry" = None # type: ignore[assignment]
profile_registry: "ProfileRegistry" = None # type: ignore[assignment]
backend_registry: "BackendRegistry" = None # type: ignore[assignment]
cp_registry: "ContextProviderRegistry" = None # type: ignore[assignment]
workers: list["Worker"] = field(default_factory=list)
mcp_manager: "McpManager | None" = None
terminal_manager: "TerminalManager | None" = None
orchestrator: "AgentSessionOrchestrator | None" = None
_agent: "Agent | None" = field(default=None, repr=False)
def get_agent(self) -> "Agent":
from navi.core import Agent
if self._agent is None:
self._agent = Agent(
session_store=self.session_store,
profile_registry=self.profile_registry,
tool_registry=self.tool_registry,
backend_registry=self.backend_registry,
workers=self.workers,
memory_store=self.memory_store,
cp_registry=self.cp_registry,
mcp_manager=self.mcp_manager,
)
return self._agent
async def shutdown(self) -> None:
"""Close all resources that need explicit cleanup."""
if self.mcp_manager is not None:
try:
await self.mcp_manager.disconnect_all()
except BaseException:
pass
if self.database is not None:
try:
await self.database.close()
except BaseException:
pass
async def create_container() -> AppContainer:
"""Build the full application container."""
from navi.core.registry import build_default_registries
from navi.core.scheduler import RecallScheduler
from navi.core.pg_session_store import PgSessionStore
from navi.db import Database
from navi.memory import MemoryStore
from navi.mcp import McpManager, load_mcp_servers
from navi.mcp.tools import McpTool
from navi.store import KvStore
from navi.workers import build_default_workers
if not settings.database_url:
raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.")
database = Database(settings.database_url)
pool = await database.pool()
session_store = PgSessionStore(pool)
memory_store = MemoryStore(pool)
kv_store = KvStore(pool)
scheduler = RecallScheduler(pool)
# Create MCP manager early so build_default_registries can wire it in
mcp_manager = McpManager()
await mcp_manager.load_all()
from navi.tools.terminal_manager import TerminalManager
terminal_manager = TerminalManager()
terminal_manager.start()
tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries(
memory_store=memory_store,
session_store=session_store,
scheduler=scheduler,
kv_store=kv_store,
mcp_manager=mcp_manager,
terminal_manager=terminal_manager,
)
# Wire embedding backend into memory store
try:
if settings.embedding_ollama_host:
emb_backend = OllamaBackend(
model=settings.embedding_model,
host=settings.embedding_ollama_host,
api_key=settings.embedding_ollama_api_key,
timeout=settings.ollama_request_timeout,
)
else:
emb_backend = backend_registry.get("ollama")
if hasattr(memory_store, "set_embedding_backend"):
memory_store.set_embedding_backend(emb_backend)
except Exception:
pass
workers = build_default_workers()
# Register MCP tools for servers that connected during startup.
# This must happen AFTER build_default_registries creates tool_registry.
for srv_name, client in mcp_manager.clients.items():
if not client.connected:
continue
try:
tools = await client.list_tools()
logger.info(
"Registering startup MCP tools for %r -> %d tools",
srv_name, len(tools),
)
for tool in tools:
mcp_tool = McpTool(
server_name=srv_name,
tool_name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema,
manager=mcp_manager,
)
tool_registry.register_external(mcp_tool)
except Exception as exc:
logger.warning("Failed to register MCP tools for %r: %s", srv_name, exc)
# Callback for health-check reconnects (server comes back online later)
async def _on_mcp_server_connected(server_name: str) -> None:
from navi.core.event_bus import get_event_bus
from navi.core.events import McpStatusUpdate
client = mcp_manager.clients.get(server_name)
if not client:
logger.warning("_on_mcp_server_connected: no client for %r", server_name)
return
try:
tools = await client.list_tools()
logger.info(
"_on_mcp_server_connected: %r -> %d tools",
server_name, len(tools),
)
for tool in tools:
mcp_tool = McpTool(
server_name=server_name,
tool_name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema,
manager=mcp_manager,
)
tool_registry.register_external(mcp_tool)
logger.debug(
"Registered MCP tool: %s", mcp_tool.name,
)
await get_event_bus().publish(
McpStatusUpdate(
server_name=server_name,
status="connected",
tool_count=len(tools),
)
)
except Exception as exc:
logger.warning("Failed to register MCP tools for %r: %s", server_name, exc)
mcp_manager.set_on_server_connected(_on_mcp_server_connected)
mcp_manager.start_health_check()
container = AppContainer(
database=database,
memory_store=memory_store,
session_store=session_store,
kv_store=kv_store,
scheduler=scheduler,
tool_registry=tool_registry,
profile_registry=profile_registry,
backend_registry=backend_registry,
cp_registry=cp_registry,
workers=workers,
mcp_manager=mcp_manager,
terminal_manager=terminal_manager,
)
from navi.core.orchestrator import AgentSessionOrchestrator
container.orchestrator = AgentSessionOrchestrator(container)
return container