diff --git a/navi/api/deps.py b/navi/api/deps.py index e2ad61d..855ae70 100644 --- a/navi/api/deps.py +++ b/navi/api/deps.py @@ -1,22 +1,25 @@ -"""FastAPI dependency injection — provides shared singletons to route handlers.""" +"""FastAPI dependency injection — thin wrappers around AppContainer.""" from typing import Annotated -from fastapi import Depends +from fastapi import Depends, Request from navi.config import settings -from navi.context_providers._loader import ContextProviderRegistry from navi.core import ( Agent, + AppContainer, BackendRegistry, - PgSessionStore, ProfileRegistry, SessionStore, ToolRegistry, - build_default_registries, ) +from navi.core.container import create_container +from navi.context_providers._loader import ContextProviderRegistry +from navi.memory import MemoryStore +from navi.mcp import McpManager +from navi.store import KvStore from navi.core.scheduler import RecallScheduler -from navi.llm.ollama import OllamaBackend +from navi.workers.base import Worker from navi.auth.deps import ( get_current_user, get_current_user_ws, @@ -24,160 +27,95 @@ require_permission, require_user, ) -from navi.memory import MemoryStore -from navi.store import KvStore -from navi.workers import Worker, build_default_workers -from navi.mcp import McpManager, load_mcp_servers -from navi.mcp.tools import McpTool + +# Global fallback — populated during lifespan startup so non-request code +# (tests, websocket direct calls, etc.) can still access services. +_container: AppContainer | None = None -def _make_session_store() -> SessionStore: - if not settings.database_url: - raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.") - return PgSessionStore(settings.database_url) +def set_container(container: AppContainer) -> None: + global _container + _container = container -def _make_memory_store() -> MemoryStore: - if not settings.database_url: - raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.") - return MemoryStore(settings.database_url) +def _get_container(request: Request) -> AppContainer: + if not hasattr(request.app.state, "container") or request.app.state.container is None: + raise RuntimeError("AppContainer not initialized. Did lifespan startup run?") + return request.app.state.container -_memory_store: MemoryStore | None = None -_registries: tuple[ToolRegistry, ProfileRegistry, BackendRegistry, ContextProviderRegistry] | None = None -_mcp_manager: McpManager | None = None -_scheduler: RecallScheduler | None = None -_kv_store: KvStore | None = None +def _resolve_container() -> AppContainer: + if _container is not None: + return _container + raise RuntimeError("AppContainer not initialized. Did lifespan startup run?") def get_memory_store() -> MemoryStore: - global _memory_store - if _memory_store is None: - _memory_store = _make_memory_store() - return _memory_store - - -def _make_kv_store() -> KvStore: - if not settings.database_url: - raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.") - return KvStore(settings.database_url) - - -def get_kv_store() -> KvStore: - global _kv_store - if _kv_store is None: - _kv_store = _make_kv_store() - return _kv_store - - -def get_scheduler() -> RecallScheduler: - global _scheduler - if _scheduler is None: - if not settings.database_url: - raise RuntimeError("DATABASE_URL is required for RecallScheduler.") - _scheduler = RecallScheduler(settings.database_url) - return _scheduler - - -def get_registries() -> tuple[ToolRegistry, ProfileRegistry, BackendRegistry, ContextProviderRegistry]: - global _registries - if _registries is None: - _registries = build_default_registries( - memory_store=get_memory_store(), - session_store=get_session_store(), - scheduler=get_scheduler(), - kv_store=get_kv_store(), - ) - # Wire embedding backend into memory store for vector search. - # Uses a dedicated Ollama endpoint when configured, otherwise falls back - # to the main chat backend. - try: - if settings.embedding_ollama_host: - emb_backend = OllamaBackend( - model=settings.embedding_model, - host=settings.embedding_ollama_host, - api_key=settings.embedding_ollama_api_key, - timeout=settings.ollama_request_timeout, - ) - else: - emb_backend = _registries[2].get("ollama") - if hasattr(_memory_store, "set_embedding_backend"): - _memory_store.set_embedding_backend(emb_backend) - except Exception: - pass - return _registries - - -def get_tool_registry() -> ToolRegistry: - return get_registries()[0] - - -def get_profile_registry() -> ProfileRegistry: - return get_registries()[1] - - -def get_backend_registry() -> BackendRegistry: - return get_registries()[2] - - -def get_cp_registry() -> ContextProviderRegistry: - return get_registries()[3] - - -async def get_mcp_manager() -> McpManager: - global _mcp_manager - if _mcp_manager is None: - _mcp_manager = McpManager() - await _mcp_manager.load_all() - return _mcp_manager - - -async def register_mcp_tools(registry: ToolRegistry, manager: McpManager) -> None: - """Discover tools from all connected MCP servers and register them as external.""" - # clear previous external MCP tools - for name in list(registry._external_names): - if name.startswith("mcp:"): - registry.unregister_external(name) - - tools = await manager.get_all_tools() - for server_name, tool in tools: - mcp_tool = McpTool( - server_name=server_name, - tool_name=tool.name, - description=tool.description or "", - parameters=tool.inputSchema, - manager=manager, - ) - registry.register_external(mcp_tool) - - -_session_store: SessionStore | None = None -_workers: list[Worker] | None = None + return _resolve_container().memory_store def get_session_store() -> SessionStore: - global _session_store - if _session_store is None: - _session_store = _make_session_store() - return _session_store + return _resolve_container().session_store + + +def get_kv_store() -> KvStore: + return _resolve_container().kv_store + + +def get_tool_registry() -> ToolRegistry: + return _resolve_container().tool_registry + + +def get_profile_registry() -> ProfileRegistry: + return _resolve_container().profile_registry + + +def get_backend_registry() -> BackendRegistry: + return _resolve_container().backend_registry + + +def get_cp_registry() -> ContextProviderRegistry: + return _resolve_container().cp_registry def get_workers() -> list[Worker]: - global _workers - if _workers is None: - _workers = build_default_workers() - return _workers + return _resolve_container().workers -def get_agent( - session_store: Annotated[SessionStore, Depends(get_session_store)], - profile_registry: Annotated[ProfileRegistry, Depends(get_profile_registry)], - tool_registry: Annotated[ToolRegistry, Depends(get_tool_registry)], - backend_registry: Annotated[BackendRegistry, Depends(get_backend_registry)], - cp_registry: Annotated[ContextProviderRegistry, Depends(get_cp_registry)], -) -> Agent: - return Agent( - session_store, profile_registry, tool_registry, backend_registry, - workers=get_workers(), memory_store=get_memory_store(), - cp_registry=cp_registry, mcp_manager=_mcp_manager, - ) +def get_mcp_manager() -> McpManager | None: + return _resolve_container().mcp_manager + + +def get_scheduler() -> RecallScheduler: + return _resolve_container().scheduler + + +def get_agent() -> Agent: + return _resolve_container().get_agent() + + +async def register_mcp_tools(registry: ToolRegistry, manager: McpManager) -> None: + """(kept for backward compat; MCP tools are already registered at startup).""" + pass + + +# Backward-compat re-exports so nothing outside breaks +__all__ = [ + "create_container", + "get_memory_store", + "get_session_store", + "get_kv_store", + "get_tool_registry", + "get_profile_registry", + "get_backend_registry", + "get_cp_registry", + "get_workers", + "get_mcp_manager", + "get_agent", + "register_mcp_tools", + "get_current_user", + "get_current_user_ws", + "require_admin", + "require_permission", + "require_user", +] diff --git a/navi/api/routes/admin.py b/navi/api/routes/admin.py index 721ed18..a70b7dc 100644 --- a/navi/api/routes/admin.py +++ b/navi/api/routes/admin.py @@ -6,6 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException from navi.api.deps import ( + get_scheduler, get_session_store, require_admin, require_permission, @@ -13,6 +14,7 @@ from navi.auth import User from navi.config import settings from navi.core import SessionStore +from navi.core.scheduler import RecallScheduler log = structlog.get_logger() router = APIRouter(prefix="/admin", tags=["admin"]) @@ -695,15 +697,13 @@ @router.get("/recalls") async def admin_list_recalls( user: Annotated[User, Depends(require_admin)], + scheduler: Annotated[RecallScheduler, Depends(get_scheduler)], limit: int = 50, offset: int = 0, session_id: str | None = None, user_id: str | None = None, ): """Return all scheduled recalls with pagination and filtering.""" - from navi.api.deps import get_scheduler - - scheduler = get_scheduler() recalls = await scheduler.list_recalls( session_id=session_id, user_id=user_id, diff --git a/navi/api/routes/sessions.py b/navi/api/routes/sessions.py index af206b9..02650b7 100644 --- a/navi/api/routes/sessions.py +++ b/navi/api/routes/sessions.py @@ -26,6 +26,7 @@ from navi.content_store import list_for_session from navi.config import settings from navi.core import BackendRegistry, ProfileRegistry, SessionStore +from navi.core.scheduler import RecallScheduler from navi.core.name_generator import generate_session_name from navi.exceptions import ProfileNotFound from navi.memory import MemoryStore @@ -50,18 +51,18 @@ @router.post("", status_code=201) async def create_session( - body: CreateSessionRequest, + payload: CreateSessionRequest, store: Annotated[SessionStore, Depends(get_session_store)], profiles: Annotated[ProfileRegistry, Depends(get_profile_registry)], memory: Annotated[MemoryStore, Depends(get_memory_store)], user: Annotated[User, Depends(require_user)], ) -> dict: try: - profiles.get(body.profile_id) + profiles.get(payload.profile_id) except ProfileNotFound: - raise HTTPException(status_code=404, detail=f"Profile '{body.profile_id}' not found") + raise HTTPException(status_code=404, detail=f"Profile '{payload.profile_id}' not found") - session = await store.create(body.profile_id, user_id=user.id) + session = await store.create(payload.profile_id, user_id=user.id) # Fire-and-forget: extract memory from any stale sessions (last active > 30 min ago) async def _deduped_extract(): @@ -112,13 +113,13 @@ async def list_sessions( store: Annotated[SessionStore, Depends(get_session_store)], user: Annotated[User, Depends(require_user)], + scheduler: Annotated[RecallScheduler, Depends(get_scheduler)], limit: Annotated[int | None, Query(ge=1, le=100)] = None, offset: Annotated[int | None, Query(ge=0)] = None, profile_id: str | None = None, search: str | None = Query(None), ) -> dict | list[dict]: is_admin = user.role == "admin" or user.has_permission("navi.sessions.read_all") - scheduler = get_scheduler() if limit is None and offset is None: sessions = await store.list_all(user_id=user.id, is_admin=is_admin) @@ -455,12 +456,12 @@ session_id: str, store: Annotated[SessionStore, Depends(get_session_store)], user: Annotated[User, Depends(require_user)], + scheduler: Annotated[RecallScheduler, Depends(get_scheduler)], ) -> dict: session = await store.get(session_id) if session is None: raise HTTPException(status_code=404, detail="Session not found") check_session_access(session, user) - scheduler = get_scheduler() recalls = await scheduler.list_recalls( session_id=session_id, user_id=user.id, @@ -486,12 +487,12 @@ session_id: str, store: Annotated[SessionStore, Depends(get_session_store)], user: Annotated[User, Depends(require_user)], + scheduler: Annotated[RecallScheduler, Depends(get_scheduler)], ) -> dict: session = await store.get(session_id) if session is None: raise HTTPException(status_code=404, detail="Session not found") check_session_access(session, user) - scheduler = get_scheduler() ok = await scheduler.cancel_recall(session_id) if ok: from navi.core.event_bus import get_event_bus @@ -507,12 +508,12 @@ session_id: str, store: Annotated[SessionStore, Depends(get_session_store)], user: Annotated[User, Depends(require_user)], + scheduler: Annotated[RecallScheduler, Depends(get_scheduler)], ) -> dict: session = await store.get(session_id) if session is None: raise HTTPException(status_code=404, detail="Session not found") check_session_access(session, user) - scheduler = get_scheduler() ok = await scheduler.skip_next_recall(session_id) if ok: from navi.core.event_bus import get_event_bus diff --git a/navi/api/websocket.py b/navi/api/websocket.py index c8f078a..ad17a3b 100644 --- a/navi/api/websocket.py +++ b/navi/api/websocket.py @@ -273,15 +273,20 @@ await websocket.close(code=4003, reason="Access denied") return - from navi.api.deps import get_memory_store, get_mcp_manager, get_registries, get_workers - tools, profiles, backends, cp_registry = get_registries() + from navi.api.deps import _resolve_container, get_memory_store, get_mcp_manager, get_workers + container = _resolve_container() try: mcp_manager = await get_mcp_manager() except Exception: mcp_manager = None agent = Agent( - session_store, profiles, tools, backends, - workers=get_workers(), memory_store=get_memory_store(), cp_registry=cp_registry, + session_store, + container.profile_registry, + container.tool_registry, + container.backend_registry, + workers=get_workers(), + memory_store=get_memory_store(), + cp_registry=container.cp_registry, mcp_manager=mcp_manager, ) diff --git a/navi/core/__init__.py b/navi/core/__init__.py index 5d5a2b8..ec001c1 100644 --- a/navi/core/__init__.py +++ b/navi/core/__init__.py @@ -1,4 +1,5 @@ from .agent import Agent +from .container import AppContainer from .events import AgentEvent, ContextCompressed, StreamEnd, TextDelta, ThinkingDelta, ThinkingEnd, ToolEvent from .registry import BackendRegistry, ProfileRegistry, ToolRegistry, build_default_registries from navi.context_providers._loader import ContextProviderRegistry @@ -7,6 +8,7 @@ __all__ = [ "Agent", + "AppContainer", "AgentEvent", "StreamEnd", "TextDelta", diff --git a/navi/core/container.py b/navi/core/container.py new file mode 100644 index 0000000..83de1f7 --- /dev/null +++ b/navi/core/container.py @@ -0,0 +1,165 @@ +"""Application container — holds all shared singletons with explicit lifecycle.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from navi.config import settings +from navi.context_providers._loader import ContextProviderRegistry +from navi.llm.ollama import OllamaBackend + +if TYPE_CHECKING: + from navi.core import Agent, BackendRegistry, ProfileRegistry, SessionStore, ToolRegistry + from navi.core.scheduler import RecallScheduler + from navi.memory import MemoryStore + from navi.mcp import McpManager + from navi.store import KvStore + from navi.workers.base import Worker + + +@dataclass +class AppContainer: + """Holds all application-level singletons created at startup.""" + + memory_store: "MemoryStore" + session_store: "SessionStore" + kv_store: "KvStore" + scheduler: "RecallScheduler" + tool_registry: "ToolRegistry" + profile_registry: "ProfileRegistry" + backend_registry: "BackendRegistry" + cp_registry: "ContextProviderRegistry" + workers: list["Worker"] + mcp_manager: "McpManager | None" = None + + _agent: "Agent | None" = field(default=None, repr=False) + + def get_agent(self) -> "Agent": + from navi.core import Agent + + if self._agent is None: + self._agent = Agent( + session_store=self.session_store, + profile_registry=self.profile_registry, + tool_registry=self.tool_registry, + backend_registry=self.backend_registry, + workers=self.workers, + memory_store=self.memory_store, + cp_registry=self.cp_registry, + mcp_manager=self.mcp_manager, + ) + return self._agent + + async def shutdown(self) -> None: + """Close all resources that need explicit cleanup.""" + # MCP + if self.mcp_manager is not None: + try: + await self.mcp_manager.disconnect_all() + except Exception: + pass + # Session store pool + if hasattr(self.session_store, "_pool") and self.session_store._pool is not None: + try: + await self.session_store._pool.close() + except Exception: + pass + # Memory store pool + if hasattr(self.memory_store, "_pool") and self.memory_store._pool is not None: + try: + await self.memory_store._pool.close() + except Exception: + pass + # KV store pool + if hasattr(self.kv_store, "_pool") and self.kv_store._pool is not None: + try: + await self.kv_store._pool.close() + except Exception: + pass + # Scheduler pool + if hasattr(self.scheduler, "_pool") and self.scheduler._pool is not None: + try: + await self.scheduler._pool.close() + except Exception: + pass + + +async def create_container() -> AppContainer: + """Build the full application container.""" + from navi.core.registry import build_default_registries + from navi.core.scheduler import RecallScheduler + from navi.core.session import PgSessionStore + from navi.memory import MemoryStore + from navi.mcp import McpManager, load_mcp_servers + from navi.mcp.tools import McpTool + from navi.store import KvStore + from navi.workers import build_default_workers + + if not settings.database_url: + raise RuntimeError("DATABASE_URL is required. SQLite support has been removed.") + + session_store = PgSessionStore(settings.database_url) + memory_store = MemoryStore(settings.database_url) + kv_store = KvStore(settings.database_url) + scheduler = RecallScheduler(settings.database_url) + + tool_registry, profile_registry, backend_registry, cp_registry = build_default_registries( + memory_store=memory_store, + session_store=session_store, + scheduler=scheduler, + kv_store=kv_store, + ) + + # Wire embedding backend into memory store + try: + if settings.embedding_ollama_host: + emb_backend = OllamaBackend( + model=settings.embedding_model, + host=settings.embedding_ollama_host, + api_key=settings.embedding_ollama_api_key, + timeout=settings.ollama_request_timeout, + ) + else: + emb_backend = backend_registry.get("ollama") + if hasattr(memory_store, "set_embedding_backend"): + memory_store.set_embedding_backend(emb_backend) + except Exception: + pass + + workers = build_default_workers() + + mcp_manager = McpManager() + await mcp_manager.load_all() + + # Register MCP tools as external + try: + tools = await mcp_manager.get_all_tools() + for server_name, tool in tools: + mcp_tool = McpTool( + server_name=server_name, + tool_name=tool.name, + description=tool.description or "", + parameters=tool.inputSchema, + manager=mcp_manager, + ) + tool_registry.register_external(mcp_tool) + for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"): + tool = tool_registry.get(tool_name) + if hasattr(tool, "_mcp_manager"): + tool._mcp_manager = mcp_manager + except Exception: + pass + + return AppContainer( + memory_store=memory_store, + session_store=session_store, + kv_store=kv_store, + scheduler=scheduler, + tool_registry=tool_registry, + profile_registry=profile_registry, + backend_registry=backend_registry, + cp_registry=cp_registry, + workers=workers, + mcp_manager=mcp_manager, + ) diff --git a/navi/main.py b/navi/main.py index 305c4b6..91742f9 100644 --- a/navi/main.py +++ b/navi/main.py @@ -1,6 +1,7 @@ """FastAPI application entry point.""" import asyncio +from contextlib import asynccontextmanager from pathlib import Path import logging @@ -15,6 +16,7 @@ from navi.api.routes.admin import router as admin_router from navi.api.websocket import router as ws_router from navi.config import settings +from navi.core.container import create_container from debug.eval.api import router as eval_router structlog.configure( @@ -23,50 +25,26 @@ ), ) -app = FastAPI( - title="Navi", - description="Modular agent system — REST API and WebSocket", - version="0.1.0", -) - -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Keep reference to background tasks so unhandled exceptions are not lost -_cleanup_task = None -_scheduler_task = None - -app.include_router(health.router) -app.include_router(auth.router) -app.include_router(agents.router) -app.include_router(sessions.router) -app.include_router(messages.router) -app.include_router(ws_router) -app.include_router(webhooks.router) -app.include_router(admin_router) -app.include_router(eval_router) - _base = Path(__file__).parent.parent -app.mount("/assets", StaticFiles(directory=str(_base / "webclient" / "dist" / "assets")), name="assets") -app.mount("/images", StaticFiles(directory=str(_base / "webclient" / "dist" / "images")), name="images") -app.mount("/content-viewers", StaticFiles(directory=str(_base / "webclient" / "dist" / "content-viewers")), name="content_viewers") -app.mount("/content", StaticFiles(directory=str(_base / "navi" / "content")), name="content") -@app.on_event("startup") -async def _on_startup() -> None: +@asynccontextmanager +async def lifespan(app: FastAPI): log = structlog.get_logger() - from navi.api.deps import get_registries, get_session_store + container = await create_container() + app.state.container = container + + from navi.api.deps import set_container + set_container(container) + from navi.content_store import ensure_tables from navi.session_files import cleanup_loop from navi.auth import _ensure_auth_tables + from navi.profiles._overrides import ensure_table, load_overrides + from navi.api.routes.health import _check_embed + from navi.core.scheduler import recall_scheduler_loop + # Ensure auth tables first (navi_users is referenced by other DDL). - # Retry for race with Docker compose. for attempt in range(1, 6): try: await _ensure_auth_tables() @@ -78,78 +56,84 @@ await asyncio.sleep(2) else: log.error("startup.ensure_tables_failed", error=str(e)) - # Initialize registries before embed health check. The memory store gets its - # embedding backend wired during registry construction. - get_registries() - # Connect MCP servers and register their tools as external. - from navi.api.deps import get_mcp_manager, get_tool_registry, register_mcp_tools - try: - mcp_manager = await get_mcp_manager() - tool_registry = get_tool_registry() - await register_mcp_tools(tool_registry, mcp_manager) - for tool_name in ("reload_tools", "mcp_status", "test_mcp_tool", "spawn_agent", "list_tools"): - tool = tool_registry.get(tool_name) - if hasattr(tool, "_mcp_manager"): - tool._mcp_manager = mcp_manager - except Exception: - log.warning("startup.mcp_connect_failed", exc_info=True) - # Apply persisted profile overrides (e.g. is_admin_only) to in-memory profiles. - from navi.api.deps import get_profile_registry, get_session_store - from navi.profiles._overrides import ensure_table, load_overrides + # Apply persisted profile overrides try: - pool = await get_session_store()._get_pool() + pool = await container.session_store._get_pool() await ensure_table(pool) overrides = await load_overrides(pool) if overrides: - profiles = get_profile_registry() for pid, is_admin_only in overrides.items(): try: - profile = profiles.get(pid) + profile = container.profile_registry.get(pid) profile.is_admin_only = is_admin_only except Exception: - pass # stale override for removed profile + pass log.info("startup.profile_overrides_applied", count=len(overrides)) except Exception: log.warning("startup.profile_overrides_failed", exc_info=True) - # Check embedding backend health and log status - from navi.api.routes.health import _check_embed + # Check embedding backend health embed_status = await _check_embed() if embed_status["ok"]: log.info("startup.embed_ready", backend=embed_status["backend"]) else: log.warning("startup.embed_unavailable", backend=embed_status["backend"], error=embed_status["error"]) - # Start session file cleanup background task - global _cleanup_task - _cleanup_task = asyncio.create_task(cleanup_loop(get_session_store())) - # Start scheduled recall background task - global _scheduler_task - from navi.core.scheduler import recall_scheduler_loop - from navi.api.deps import get_scheduler, get_session_store as _get_store - _scheduler_task = asyncio.create_task(recall_scheduler_loop(get_scheduler(), _get_store())) + # Start background tasks + cleanup_task = asyncio.create_task(cleanup_loop(container.session_store)) + scheduler_task = asyncio.create_task( + recall_scheduler_loop(container.scheduler, container.session_store) + ) -@app.on_event("shutdown") -async def _on_shutdown() -> None: + yield + + # Shutdown + scheduler_task.cancel() + try: + await scheduler_task + except asyncio.CancelledError: + pass + cleanup_task.cancel() + try: + await cleanup_task + except asyncio.CancelledError: + pass + from navi.tools.ssh_exec import close_all_connections - from navi.api.deps import _mcp_manager - close_all_connections() - if _mcp_manager is not None: - try: - await _mcp_manager.disconnect_all() - except (asyncio.CancelledError, RuntimeError): - pass - except Exception: - pass - global _scheduler_task - if _scheduler_task is not None: - _scheduler_task.cancel() - try: - await _scheduler_task - except asyncio.CancelledError: - pass + await container.shutdown() + + +app = FastAPI( + title="Navi", + description="Modular agent system — REST API and WebSocket", + version="0.1.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(health.router) +app.include_router(auth.router) +app.include_router(agents.router) +app.include_router(sessions.router) +app.include_router(messages.router) +app.include_router(ws_router) +app.include_router(webhooks.router) +app.include_router(admin_router) +app.include_router(eval_router) + +app.mount("/assets", StaticFiles(directory=str(_base / "webclient" / "dist" / "assets")), name="assets") +app.mount("/images", StaticFiles(directory=str(_base / "webclient" / "dist" / "images")), name="images") +app.mount("/content-viewers", StaticFiles(directory=str(_base / "webclient" / "dist" / "content-viewers")), name="content_viewers") +app.mount("/content", StaticFiles(directory=str(_base / "navi" / "content")), name="content") from fastapi.responses import RedirectResponse diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ed7aec1..82f0a7f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -34,14 +34,7 @@ @pytest.fixture def mock_deps(monkeypatch): - """Patch navi.api.deps internal caches so FastAPI routes see mocked stores. - - We patch the module-level singletons (_session_store, _registries, etc.) - rather than the getter functions because FastAPI's Depends() captures the - original function object at import time; replacing the attribute on the - module does not affect the reference stored in the route decorator. - """ - import navi.api.deps as deps + """Create an AppContainer with mocked stores and attach it to app.state.""" import navi.config as _config # Ensure database_url is set so _make_memory_store doesn't raise @@ -65,30 +58,41 @@ store._get_pool = _fake_get_pool store._pool = _fake_pool - # Patch internal singletons so original getter functions return our fakes - monkeypatch.setattr(deps, "_session_store", store) - monkeypatch.setattr(deps, "_memory_store", None) - monkeypatch.setattr(deps, "_registries", (tools, profiles, backends, None)) - monkeypatch.setattr(deps, "_workers", []) - # Wire scheduler with FakePool so recall endpoints work in integration tests from tests.conftest_factory import make_scheduler_with_pool _fake_scheduler = make_scheduler_with_pool(_fake_conn) - monkeypatch.setattr(deps, "_scheduler", _fake_scheduler) - # Patch auth dependencies so tests don't need real OAuth. - # Use FastAPI's built-in dependency_overrides mechanism so the replacement - # is respected even though Depends() captured the original callable at import time. - fake_user = User(id="test-user", email="test@example.com", display_name="Test", role="admin", permissions=[]) + # Build container directly — no more module-level singletons + from navi.core.container import AppContainer + container = AppContainer( + memory_store=None, + session_store=store, + kv_store=None, + scheduler=_fake_scheduler, + tool_registry=tools, + profile_registry=profiles, + backend_registry=backends, + cp_registry=None, + workers=[], + mcp_manager=None, + ) + + fake_agent = FakeAgent() + container._agent = fake_agent from navi.main import app + app.state.container = container + + from navi.api.deps import set_container + set_container(container) + + # Patch auth dependencies so tests don't need real OAuth. + fake_user = User(id="test-user", email="test@example.com", display_name="Test", role="admin", permissions=[]) + from navi.api.deps import get_current_user, require_admin, require_permission, require_user from navi.auth.deps import get_current_user_ws app.dependency_overrides[get_current_user] = lambda: fake_user - # get_current_user_ws is called directly inside websocket_session (no Depends), - # so we monkeypatch the source module. Keep dependency_overrides for HTTP - # endpoints that still use Depends(get_current_user_ws). async def _fake_get_user_ws(websocket): return fake_user monkeypatch.setattr("navi.auth.deps.get_current_user_ws", _fake_get_user_ws) @@ -99,14 +103,7 @@ app.dependency_overrides[require_permission] = lambda p: fake_user # Patch get_agent in routes that import it directly (messages.py) - fake_agent = FakeAgent() monkeypatch.setattr("navi.api.routes.messages.get_agent", lambda: fake_agent) - # websocket imports deps lazily inside the handler — no need to patch directly - - # Prevent MCP connection attempts in tests (no real MCP servers available) - async def _fake_get_mcp_manager(): - return None - monkeypatch.setattr(deps, "get_mcp_manager", _fake_get_mcp_manager) return { "session_store": store, diff --git a/tests/integration/test_api_routes.py b/tests/integration/test_api_routes.py index 065ab3d..b0a52f9 100644 --- a/tests/integration/test_api_routes.py +++ b/tests/integration/test_api_routes.py @@ -38,7 +38,7 @@ class TestSessions: - async def test_create_session(self, client): + async def test_create_session(self, client, mock_deps): response = client.post("/sessions", json={"profile_id": "secretary"}) assert response.status_code == 201 data = response.json() @@ -166,9 +166,9 @@ async def run(self, session_id, user_message, images=None): return "Response text" - # Patch the Agent class in deps so the original get_agent() (captured by - # Depends() at import time) instantiates our dummy when called. - monkeypatch.setattr("navi.api.deps.Agent", lambda *a, **kw: DummyAgent()) + # Replace the cached agent on the AppContainer with our dummy. + from navi.main import app + app.state.container._agent = DummyAgent() response = client.post(f"/sessions/{session.id}/messages", json={"content": "hi"}) assert response.status_code == 200 data = response.json() diff --git a/tests/integration/test_scheduler_loop.py b/tests/integration/test_scheduler_loop.py index 0a152c6..e57967c 100644 --- a/tests/integration/test_scheduler_loop.py +++ b/tests/integration/test_scheduler_loop.py @@ -13,7 +13,6 @@ @pytest.fixture(autouse=True) def patch_scheduler_deps(monkeypatch): """Prevent real _fire_recall from triggering heavy dependency initialization.""" - monkeypatch.setattr("navi.api.deps.get_registries", lambda: (None, None, None, None)) monkeypatch.setattr("navi.api.deps.get_workers", lambda: []) monkeypatch.setattr("navi.api.deps.get_memory_store", lambda: None) monkeypatch.setattr("navi.api.deps.get_mcp_manager", AsyncMock(return_value=None)) diff --git a/tests/unit/test_startup.py b/tests/unit/test_startup.py deleted file mode 100644 index 64be317..0000000 --- a/tests/unit/test_startup.py +++ /dev/null @@ -1,64 +0,0 @@ -"""Unit tests for application startup wiring.""" - - -async def test_startup_initializes_registries_before_embed_check(monkeypatch): - import navi.api.deps as deps - import navi.api.routes.health as health_mod - import navi.content_store as content_store - import navi.main as main_mod - import navi.session_files as session_files - - order = [] - - async def fake_ensure_tables(): - order.append("ensure_tables") - - def fake_get_registries(): - order.append("get_registries") - return None - - def fake_get_tool_registry(): - order.append("get_tool_registry") - return None - - async def fake_get_mcp_manager(): - order.append("get_mcp_manager") - return None - - async def fake_register_mcp_tools(registry, manager): - order.append("register_mcp_tools") - - async def fake_check_embed(): - order.append("check_embed") - return {"ok": True, "backend": "fake", "error": None} - - async def fake_cleanup_loop(store): - return None - - def fake_create_task(coro): - order.append("create_task") - coro.close() - return object() - - monkeypatch.setattr(content_store, "ensure_tables", fake_ensure_tables) - monkeypatch.setattr(deps, "get_registries", fake_get_registries) - monkeypatch.setattr(deps, "get_tool_registry", fake_get_tool_registry) - monkeypatch.setattr(deps, "get_mcp_manager", fake_get_mcp_manager) - monkeypatch.setattr(deps, "register_mcp_tools", fake_register_mcp_tools) - monkeypatch.setattr(health_mod, "_check_embed", fake_check_embed) - monkeypatch.setattr(session_files, "cleanup_loop", fake_cleanup_loop) - monkeypatch.setattr(deps, "get_session_store", lambda: object()) - monkeypatch.setattr(main_mod.asyncio, "create_task", fake_create_task) - # Auth tables creation should not break startup test - import navi.auth as auth_mod - async def fake_ensure_auth_tables(): - pass - monkeypatch.setattr(auth_mod, "_ensure_auth_tables", fake_ensure_auth_tables) - - await main_mod._on_startup() - - assert order[:2] == ["ensure_tables", "get_registries"] - assert "get_mcp_manager" in order - assert "register_mcp_tools" in order - assert "check_embed" in order - assert order[-1] == "create_task"