"""Application container — holds all shared singletons with explicit lifecycle."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from navi.config import settings
from navi.context_providers._loader import ContextProviderRegistry
from navi.llm.ollama import OllamaBackend
if TYPE_CHECKING:
from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry
from navi.core.scheduler import RecallScheduler
from navi.memory import MemoryStore
from navi.mcp import McpManager
from navi.store import KvStore
from navi.workers.base import Worker
@dataclass
class AppContainer:
"""Holds all application-level singletons created at startup."""
memory_store: "MemoryStore"
session_store: "SessionStore"
kv_store: "KvStore"
scheduler: "RecallScheduler"
tool_registry: "ToolRegistry"
profile_registry: "ProfileRegistry"
backend_registry: "BackendRegistry"
cp_registry: "ContextProviderRegistry"
workers: list["Worker"]
mcp_manager: "McpManager | None" = None
_agent: "Agent | None" = field(default=None, repr=False)
def get_agent(self) -> "Agent":
from navi.core import Agent
if self._agent is None:
self._agent = Agent(
session_store=self.session_store,
profile_registry=self.profile_registry,
tool_registry=self.tool_registry,
backend_registry=self.backend_registry,
workers=self.workers,
memory_store=self.memory_store,
cp_registry=self.cp_registry,
mcp_manager=self.mcp_manager,
)
return self._agent
async def shutdown(self) -> None:
"""Close all resources that need explicit cleanup."""
# MCP
if self.mcp_manager is not None:
try:
await self.mcp_manager.disconnect_all()
except Exception:
pass
# Session store pool
if hasattr(self.session_store, "_pool") and self.session_store._pool is not None:
try:
await self.session_store._pool.close()
except Exception:
pass
# Memory store pool
if hasattr(self.memory_store, "_pool") and self.memory_store._pool is not None:
try:
await self.memory_store._pool.close()
except Exception:
pass
# KV store pool
if hasattr(self.kv_store, "_pool") and self.kv_store._pool is not None:
try:
await self.kv_store._pool.close()
except Exception:
pass
# Scheduler pool
if hasattr(self.scheduler, "_pool") and self.scheduler._pool is not None:
try:
await self.scheduler._pool.close()
except Exception:
pass
async def create_container() -> AppContainer:
"""Build the full application container."""
from navi.core.registry import build_default_registries
from navi.core.scheduler import RecallScheduler
from navi.core.session import PgSessionStore
from navi.memory import MemoryStore
from navi.mcp import McpManager, load_mcp_servers
from navi.mcp.tools import McpTool
from navi.store import KvStore
from navi.workers import build_default_workers
if not settings.database_url:
raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.")
session_store = PgSessionStore(settings.database_url)
memory_store = MemoryStore(settings.database_url)
kv_store = KvStore(settings.database_url)
scheduler = RecallScheduler(settings.database_url)
tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries(
memory_store=memory_store,
session_store=session_store,
scheduler=scheduler,
kv_store=kv_store,
)
# Wire embedding backend into memory store
try:
if settings.embedding_ollama_host:
emb_backend = OllamaBackend(
model=settings.embedding_model,
host=settings.embedding_ollama_host,
api_key=settings.embedding_ollama_api_key,
timeout=settings.ollama_request_timeout,
)
else:
emb_backend = backend_registry.get("ollama")
if hasattr(memory_store, "set_embedding_backend"):
memory_store.set_embedding_backend(emb_backend)
except Exception:
pass
workers = build_default_workers()
mcp_manager = McpManager()
await mcp_manager.load_all()
# Register MCP tools as external
try:
tools = await mcp_manager.get_all_tools()
for server_name, tool in tools:
mcp_tool = McpTool(
server_name=server_name,
tool_name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema,
manager=mcp_manager,
)
tool_registry.register_external(mcp_tool)
for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"):
tool = tool_registry.get(tool_name)
if hasattr(tool, "_mcp_manager"):
tool._mcp_manager = mcp_manager
except Exception:
pass
return AppContainer(
memory_store=memory_store,
session_store=session_store,
kv_store=kv_store,
scheduler=scheduler,
tool_registry=tool_registry,
profile_registry=profile_registry,
backend_registry=backend_registry,
cp_registry=cp_registry,
workers=workers,
mcp_manager=mcp_manager,
)