Newer
Older
navi-1 / navi / llm / ollama.py
"""Ollama LLM backend."""

import uuid
from typing import AsyncGenerator

import ollama as ollama_client

from navi.config import settings
from navi.exceptions import LLMBackendError

from .base import LLMBackend, LLMChunk, LLMResponse, Message, ToolCallRequest, ToolSchema


def _to_ollama_messages(messages: list[Message]) -> list[dict]:
    result = []
    for m in messages:
        msg: dict = {"role": m.role, "content": m.content or ""}
        if m.images:
            msg["images"] = m.images  # list of base64 strings, Ollama format
        if m.tool_calls:
            msg["tool_calls"] = [
                {"function": {"name": tc.name, "arguments": tc.arguments}}
                for tc in m.tool_calls
            ]
        result.append(msg)
    return result


def _to_ollama_tools(tools: list[ToolSchema]) -> list[dict]:
    return [t.model_dump() for t in tools]


def _base_options(temperature: float) -> dict:
    opts: dict = {"temperature": temperature, "num_ctx": settings.ollama_num_ctx}
    if settings.ollama_think:
        opts["think"] = True
    return opts


class OllamaBackend(LLMBackend):
    def __init__(self, model: str, host: str = "http://localhost:11434"):
        self.model = model
        self._client = ollama_client.AsyncClient(host=host)

    async def complete(
        self,
        messages: list[Message],
        tools: list[ToolSchema] | None = None,
        temperature: float = 0.7,
        model: str | None = None,
    ) -> LLMResponse:
        try:
            kwargs: dict = {
                "model": model or self.model,
                "messages": _to_ollama_messages(messages),
                "options": _base_options(temperature),
                "stream": False,
            }
            if tools:
                kwargs["tools"] = _to_ollama_tools(tools)

            response = await self._client.chat(**kwargs)
            msg = response.message

            tool_calls = None
            if msg.tool_calls:
                tool_calls = [
                    ToolCallRequest(
                        id=str(uuid.uuid4()),
                        name=tc.function.name,
                        arguments=dict(tc.function.arguments),
                    )
                    for tc in msg.tool_calls
                ]

            finish_reason = "tool_calls" if tool_calls else "stop"
            return LLMResponse(
                content=msg.content or None,
                tool_calls=tool_calls,
                finish_reason=finish_reason,
                thinking=getattr(msg, "thinking", None) or None,
            )
        except Exception as e:
            raise LLMBackendError(str(e)) from e

    async def stream(
        self,
        messages: list[Message],
        temperature: float = 0.7,
        model: str | None = None,
    ) -> AsyncGenerator[LLMChunk, None]:
        try:
            async for chunk in await self._client.chat(
                model=model or self.model,
                messages=_to_ollama_messages(messages),
                options=_base_options(temperature),
                stream=True,
            ):
                thinking = getattr(chunk.message, "thinking", None) or None
                delta = chunk.message.content or None
                finish_reason = "stop" if chunk.done else None
                yield LLMChunk(
                    delta=delta,
                    thinking=thinking,
                    finish_reason=finish_reason,
                    prompt_tokens=chunk.prompt_eval_count if chunk.done else None,
                    completion_tokens=chunk.eval_count if chunk.done else None,
                )
        except Exception as e:
            raise LLMBackendError(str(e)) from e

    async def stream_complete(
        self,
        messages: list[Message],
        tools: list[ToolSchema] | None = None,
        temperature: float = 0.7,
        model: str | None = None,
    ) -> AsyncGenerator[LLMChunk, None]:
        try:
            kwargs: dict = {
                "model": model or self.model,
                "messages": _to_ollama_messages(messages),
                "options": _base_options(temperature),
                "stream": True,
            }
            if tools:
                kwargs["tools"] = _to_ollama_tools(tools)

            async for chunk in await self._client.chat(**kwargs):
                thinking = getattr(chunk.message, "thinking", None) or None
                delta = chunk.message.content or None

                tool_calls = None
                if chunk.message.tool_calls:
                    tool_calls = [
                        ToolCallRequest(
                            id=str(uuid.uuid4()),
                            name=tc.function.name,
                            arguments=dict(tc.function.arguments),
                        )
                        for tc in chunk.message.tool_calls
                    ]

                finish_reason = None
                if chunk.done:
                    finish_reason = "tool_calls" if tool_calls else "stop"

                yield LLMChunk(
                    delta=delta,
                    thinking=thinking,
                    finish_reason=finish_reason,
                    tool_calls=tool_calls,
                    prompt_tokens=chunk.prompt_eval_count if chunk.done else None,
                    completion_tokens=chunk.eval_count if chunk.done else None,
                )
        except Exception as e:
            raise LLMBackendError(str(e)) from e