Newer
Older
navi-1 / navi / api / deps.py
"""FastAPI dependency injection — thin wrappers around AppContainer."""

from typing import Annotated

from fastapi import Depends, Request

from navi.config import settings
from navi.core import (
    Agent,
    AppContainer,
    BackendRegistry,
    ProfileRegistry,
    SessionStore,
    ToolRegistry,
)
from navi.core.container import create_container
from navi.context_providers._loader import ContextProviderRegistry
from navi.memory import MemoryStore
from navi.mcp import McpManager
from navi.store import KvStore
from navi.core.scheduler import RecallScheduler
from navi.workers.base import Worker
from navi.auth.deps import (
    get_current_user,
    get_current_user_ws,
    require_admin,
    require_permission,
    require_user,
)

# Global fallback — populated during lifespan startup so non-request code
# (tests, websocket direct calls, etc.) can still access services.
_container: AppContainer | None = None


def set_container(container: AppContainer) -> None:
    global _container
    _container = container


def _get_container(request: Request) -> AppContainer:
    if not hasattr(request.app.state, "container") or request.app.state.container is None:
        raise RuntimeError("AppContainer not initialized. Did lifespan startup run?")
    return request.app.state.container


def _resolve_container() -> AppContainer:
    if _container is not None:
        return _container
    raise RuntimeError("AppContainer not initialized. Did lifespan startup run?")


def get_memory_store() -> MemoryStore:
    return _resolve_container().memory_store


def get_session_store() -> SessionStore:
    return _resolve_container().session_store


def get_kv_store() -> KvStore:
    return _resolve_container().kv_store


def get_tool_registry() -> ToolRegistry:
    return _resolve_container().tool_registry


def get_profile_registry() -> ProfileRegistry:
    return _resolve_container().profile_registry


def get_backend_registry() -> BackendRegistry:
    return _resolve_container().backend_registry


def get_cp_registry() -> ContextProviderRegistry:
    return _resolve_container().cp_registry


def get_workers() -> list[Worker]:
    return _resolve_container().workers


def get_mcp_manager() -> McpManager | None:
    return _resolve_container().mcp_manager


def get_scheduler() -> RecallScheduler:
    return _resolve_container().scheduler


def get_agent() -> Agent:
    return _resolve_container().get_agent()


def get_orchestrator():
    return _resolve_container().orchestrator


async def register_mcp_tools(registry: ToolRegistry, manager: McpManager) -> None:
    """Register all currently connected MCP tools into the tool registry."""
    from navi.mcp.tools import McpTool

    for name, client in manager.clients.items():
        if not client.connected:
            continue
        try:
            tools = await client.list_tools()
            for tool in tools:
                mcp_tool = McpTool(
                    server_name=name,
                    tool_name=tool.name,
                    description=tool.description or "",
                    parameters=tool.inputSchema,
                    manager=manager,
                )
                registry.register_external(mcp_tool)
        except Exception as exc:
            log.warning("Failed to register MCP tools for %r: %s", name, exc)


# Backward-compat re-exports so nothing outside breaks
__all__ = [
    "create_container",
    "get_memory_store",
    "get_session_store",
    "get_kv_store",
    "get_tool_registry",
    "get_profile_registry",
    "get_backend_registry",
    "get_cp_registry",
    "get_workers",
    "get_mcp_manager",
    "get_agent",
    "register_mcp_tools",
    "get_current_user",
    "get_current_user_ws",
    "require_admin",
    "require_permission",
    "require_user",
]