"""
Canonical types and abstract base class for LLM backends.
All backends translate their native wire format into these types.
Message format follows the OpenAI convention (compatible with Ollama and Anthropic adapters).
"""
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal
from pydantic import BaseModel, Field
class ToolCallRequest(BaseModel):
"""A tool call requested by the LLM."""
id: str
name: str
arguments: dict
class ToolSchema(BaseModel):
"""Tool definition sent to the LLM."""
type: str = "function"
function: dict # {name: str, description: str, parameters: JSON Schema}
class Message(BaseModel):
"""Canonical message format (OpenAI-compatible)."""
role: Literal["system", "user", "assistant", "tool"]
content: str | None = None
# base64-encoded images (multimodal); user and assistant roles only
images: list[str] | None = None
# set by assistant when requesting tool calls
tool_calls: list[ToolCallRequest] | None = None
# set on tool result messages
tool_call_id: str | None = None
name: str | None = None # tool name on tool result messages
created_at: datetime | None = None
# marks a compressed history block injected by the context compressor
is_summary: bool = False
# reasoning text produced during this turn (not sent to LLM, display only)
thinking: str | None = None
# marks a planning block — stored in messages for UI rendering, not as LLM context
is_plan: bool = False
# marks a context compression event — stored in messages for UI rendering, not as LLM context
is_compression: bool = False
# response metrics (display only, not sent to LLM)
elapsed_seconds: float | None = None
tool_call_count: int | None = None
token_count: int | None = None
# arbitrary metadata for tool result messages (e.g. url, content_type, returncode)
metadata: dict = Field(default_factory=dict)
class LLMResponse(BaseModel):
"""Non-streaming response from an LLM backend."""
content: str | None
tool_calls: list[ToolCallRequest] | None
finish_reason: str # "stop" | "tool_calls" | "length"
thinking: str | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
class LLMChunk(BaseModel):
"""A single chunk from a streaming LLM response."""
delta: str | None = None
thinking: str | None = None
finish_reason: str | None = None # "stop" | "tool_calls" | "length"; None while streaming
# Tool calls — only present on the final chunk when finish_reason == "tool_calls"
tool_calls: list[ToolCallRequest] | None = None
# Token counts — only present on the final chunk
prompt_tokens: int | None = None
completion_tokens: int | None = None
class LLMBackend(ABC):
"""Abstract base class for LLM backends."""
@abstractmethod
async def complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
temperature: float = 0.7,
model: str | None = None,
think: bool | None = None,
max_tokens: int | None = None,
) -> LLMResponse:
"""Single-shot completion. Used in the agent tool-calling loop."""
@abstractmethod
async def stream(
self,
messages: list[Message],
temperature: float = 0.7,
model: str | None = None,
) -> AsyncGenerator[LLMChunk, None]:
"""Streaming text completion (no tool calling). Used for final response streaming."""
@abstractmethod
async def stream_complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
temperature: float = 0.7,
model: str | None = None,
think: bool | None = None,
) -> AsyncGenerator[LLMChunk, None]:
"""Streaming completion with optional tool support.
Yields LLMChunk objects in real-time. Thinking and text deltas arrive as
they stream. The final chunk (finish_reason is not None) carries tool_calls
when the model requested tools, or is a plain stop otherwise.
"""
async def embed(
self,
texts: list[str],
model: str | None = None,
) -> list[list[float]]:
"""Generate embeddings for a batch of texts.
Default implementation raises NotImplementedError.
Backends that support embeddings must override this method.
"""
raise NotImplementedError(f"{type(self).__name__} does not support embeddings")