Newer
Older
navi-1 / navi / llm / base.py
"""
Canonical types and abstract base class for LLM backends.

All backends translate their native wire format into these types.
Message format follows the OpenAI convention (compatible with Ollama and Anthropic adapters).
"""

from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal

from pydantic import BaseModel, Field


class ToolCallRequest(BaseModel):
    """A tool call requested by the LLM."""

    id: str
    name: str
    arguments: dict


class ToolSchema(BaseModel):
    """Tool definition sent to the LLM."""

    type: str = "function"
    function: dict  # {name: str, description: str, parameters: JSON Schema}


class Message(BaseModel):
    """Canonical message format (OpenAI-compatible)."""

    role: Literal["system", "user", "assistant", "tool"]
    content: str | None = None
    # base64-encoded images (multimodal); user and assistant roles only
    images: list[str] | None = None
    # set by assistant when requesting tool calls
    tool_calls: list[ToolCallRequest] | None = None
    # set on tool result messages
    tool_call_id: str | None = None
    name: str | None = None  # tool name on tool result messages
    created_at: datetime | None = None
    # marks a compressed history block injected by the context compressor
    is_summary: bool = False
    # reasoning text produced during this turn (not sent to LLM, display only)
    thinking: str | None = None
    # marks a planning block — stored in messages for UI rendering, not as LLM context
    is_plan: bool = False
    # marks a context compression event — stored in messages for UI rendering, not as LLM context
    is_compression: bool = False
    # response metrics (display only, not sent to LLM)
    elapsed_seconds: float | None = None
    tool_call_count: int | None = None
    token_count: int | None = None
    # arbitrary metadata for tool result messages (e.g. url, content_type, returncode)
    metadata: dict = Field(default_factory=dict)


class LLMResponse(BaseModel):
    """Non-streaming response from an LLM backend."""

    content: str | None
    tool_calls: list[ToolCallRequest] | None
    finish_reason: str  # "stop" | "tool_calls" | "length"
    thinking: str | None = None
    prompt_tokens: int | None = None
    completion_tokens: int | None = None


class LLMChunk(BaseModel):
    """A single chunk from a streaming LLM response."""

    delta: str | None = None
    thinking: str | None = None
    finish_reason: str | None = None  # "stop" | "tool_calls" | "length"; None while streaming
    # Tool calls — only present on the final chunk when finish_reason == "tool_calls"
    tool_calls: list[ToolCallRequest] | None = None
    # Token counts — only present on the final chunk
    prompt_tokens: int | None = None
    completion_tokens: int | None = None


class LLMBackend(ABC):
    """Abstract base class for LLM backends."""

    @abstractmethod
    async def complete(
        self,
        messages: list[Message],
        tools: list[ToolSchema] | None = None,
        temperature: float = 0.7,
        model: str | None = None,
        think: bool | None = None,
        max_tokens: int | None = None,
    ) -> LLMResponse:
        """Single-shot completion. Used in the agent tool-calling loop."""

    @abstractmethod
    async def stream(
        self,
        messages: list[Message],
        temperature: float = 0.7,
        model: str | None = None,
    ) -> AsyncGenerator[LLMChunk, None]:
        """Streaming text completion (no tool calling). Used for final response streaming."""

    @abstractmethod
    async def stream_complete(
        self,
        messages: list[Message],
        tools: list[ToolSchema] | None = None,
        temperature: float = 0.7,
        model: str | None = None,
        think: bool | None = None,
    ) -> AsyncGenerator[LLMChunk, None]:
        """Streaming completion with optional tool support.

        Yields LLMChunk objects in real-time. Thinking and text deltas arrive as
        they stream. The final chunk (finish_reason is not None) carries tool_calls
        when the model requested tools, or is a plain stop otherwise.
        """

    async def embed(
        self,
        texts: list[str],
        model: str | None = None,
    ) -> list[list[float]]:
        """Generate embeddings for a batch of texts.

        Default implementation raises NotImplementedError.
        Backends that support embeddings must override this method.
        """
        raise NotImplementedError(f"{type(self).__name__} does not support embeddings")