"""FastAPI dependency injection — thin wrappers around AppContainer."""
from typing import Annotated
from fastapi import Depends, Request
from navi.config import settings
from navi.core import (
Agent,
AppContainer,
BackendRegistry,
ProfileRegistry,
SessionStore,
ToolRegistry,
)
from navi.core.container import create_container
from navi.context_providers._loader import ContextProviderRegistry
from navi.memory import MemoryStore
from navi.mcp import McpManager
from navi.store import KvStore
from navi.core.scheduler import RecallScheduler
from navi.workers.base import Worker
from navi.auth.deps import (
get_current_user,
get_current_user_ws,
require_admin,
require_permission,
require_user,
)
# Global fallback — populated during lifespan startup so non-request code
# (tests, websocket direct calls, etc.) can still access services.
_container: AppContainer | None = None
def set_container(container: AppContainer) -> None:
global _container
_container = container
def _get_container(request: Request) -> AppContainer:
if not hasattr(request.app.state, "container") or request.app.state.container is None:
raise RuntimeError("AppContainer not initialized. Did lifespan startup run?")
return request.app.state.container
def _resolve_container() -> AppContainer:
if _container is not None:
return _container
raise RuntimeError("AppContainer not initialized. Did lifespan startup run?")
def get_memory_store() -> MemoryStore:
return _resolve_container().memory_store
def get_session_store() -> SessionStore:
return _resolve_container().session_store
def get_kv_store() -> KvStore:
return _resolve_container().kv_store
def get_tool_registry() -> ToolRegistry:
return _resolve_container().tool_registry
def get_profile_registry() -> ProfileRegistry:
return _resolve_container().profile_registry
def get_backend_registry() -> BackendRegistry:
return _resolve_container().backend_registry
def get_cp_registry() -> ContextProviderRegistry:
return _resolve_container().cp_registry
def get_workers() -> list[Worker]:
return _resolve_container().workers
def get_mcp_manager() -> McpManager | None:
return _resolve_container().mcp_manager
def get_scheduler() -> RecallScheduler:
return _resolve_container().scheduler
def get_agent() -> Agent:
return _resolve_container().get_agent()
def get_orchestrator():
return _resolve_container().orchestrator
async def register_mcp_tools(registry: ToolRegistry, manager: McpManager) -> None:
"""Register all currently connected MCP tools into the tool registry."""
from navi.mcp.tools import McpTool
for name, client in manager.clients.items():
if not client.connected:
continue
try:
tools = await client.list_tools()
for tool in tools:
mcp_tool = McpTool(
server_name=name,
tool_name=tool.name,
description=tool.description or "",
parameters=tool.inputSchema,
manager=manager,
)
registry.register_external(mcp_tool)
except Exception as exc:
log.warning("Failed to register MCP tools for %r: %s", name, exc)
# Backward-compat re-exports so nothing outside breaks
__all__ = [
"create_container",
"get_memory_store",
"get_session_store",
"get_kv_store",
"get_tool_registry",
"get_profile_registry",
"get_backend_registry",
"get_cp_registry",
"get_workers",
"get_mcp_manager",
"get_agent",
"register_mcp_tools",
"get_current_user",
"get_current_user_ws",
"require_admin",
"require_permission",
"require_user",
]