"""Ollama LLM backend."""
import uuid
from typing import AsyncGenerator
import ollama as ollama_client
from navi.config import settings
from navi.exceptions import LLMBackendError
from .base import LLMBackend, LLMChunk, LLMResponse, Message, ToolCallRequest, ToolSchema
def _to_ollama_messages(messages: list[Message]) -> list[dict]:
result = []
for m in messages:
msg: dict = {"role": m.role, "content": m.content or ""}
if m.images:
msg["images"] = m.images # list of base64 strings, Ollama format
if m.tool_calls:
msg["tool_calls"] = [
{"function": {"name": tc.name, "arguments": tc.arguments}}
for tc in m.tool_calls
]
result.append(msg)
return result
def _to_ollama_tools(tools: list[ToolSchema]) -> list[dict]:
return [t.model_dump() for t in tools]
def _base_options(temperature: float) -> dict:
opts: dict = {"temperature": temperature, "num_ctx": settings.ollama_num_ctx}
if settings.ollama_think:
opts["think"] = True
return opts
class OllamaBackend(LLMBackend):
def __init__(self, model: str, host: str = "http://localhost:11434"):
self.model = model
self._client = ollama_client.AsyncClient(host=host)
async def complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
temperature: float = 0.7,
model: str | None = None,
) -> LLMResponse:
try:
kwargs: dict = {
"model": model or self.model,
"messages": _to_ollama_messages(messages),
"options": _base_options(temperature),
"stream": False,
}
if tools:
kwargs["tools"] = _to_ollama_tools(tools)
response = await self._client.chat(**kwargs)
msg = response.message
tool_calls = None
if msg.tool_calls:
tool_calls = [
ToolCallRequest(
id=str(uuid.uuid4()),
name=tc.function.name,
arguments=dict(tc.function.arguments),
)
for tc in msg.tool_calls
]
finish_reason = "tool_calls" if tool_calls else "stop"
return LLMResponse(
content=msg.content or None,
tool_calls=tool_calls,
finish_reason=finish_reason,
thinking=getattr(msg, "thinking", None) or None,
)
except Exception as e:
raise LLMBackendError(str(e)) from e
async def stream(
self,
messages: list[Message],
temperature: float = 0.7,
model: str | None = None,
) -> AsyncGenerator[LLMChunk, None]:
try:
async for chunk in await self._client.chat(
model=model or self.model,
messages=_to_ollama_messages(messages),
options=_base_options(temperature),
stream=True,
):
thinking = getattr(chunk.message, "thinking", None) or None
delta = chunk.message.content or None
finish_reason = "stop" if chunk.done else None
yield LLMChunk(
delta=delta,
thinking=thinking,
finish_reason=finish_reason,
prompt_tokens=chunk.prompt_eval_count if chunk.done else None,
completion_tokens=chunk.eval_count if chunk.done else None,
)
except Exception as e:
raise LLMBackendError(str(e)) from e
async def stream_complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
temperature: float = 0.7,
model: str | None = None,
) -> AsyncGenerator[LLMChunk, None]:
try:
kwargs: dict = {
"model": model or self.model,
"messages": _to_ollama_messages(messages),
"options": _base_options(temperature),
"stream": True,
}
if tools:
kwargs["tools"] = _to_ollama_tools(tools)
async for chunk in await self._client.chat(**kwargs):
thinking = getattr(chunk.message, "thinking", None) or None
delta = chunk.message.content or None
tool_calls = None
if chunk.message.tool_calls:
tool_calls = [
ToolCallRequest(
id=str(uuid.uuid4()),
name=tc.function.name,
arguments=dict(tc.function.arguments),
)
for tc in chunk.message.tool_calls
]
finish_reason = None
if chunk.done:
finish_reason = "tool_calls" if tool_calls else "stop"
yield LLMChunk(
delta=delta,
thinking=thinking,
finish_reason=finish_reason,
tool_calls=tool_calls,
prompt_tokens=chunk.prompt_eval_count if chunk.done else None,
completion_tokens=chunk.eval_count if chunk.done else None,
)
except Exception as e:
raise LLMBackendError(str(e)) from e