Newer
Older
navi-1 / navi / core / compressor.py
"""
Context compressor — summarizes old messages to stay within the token limit.

Flow:
1. Partition session messages into "to_summarize" (old turns) and "to_keep" (recent turns).
2. Call the LLM to produce a concise bullet-point summary of the old turns.
3. Replace the old turns with a single summary message (role=user, is_summary=True).

A "turn" is one user message plus all following assistant/tool messages up to the
next user message. Tool call groups (assistant + tool results) are never split.
Existing summary messages are always folded into the next compression pass.
"""

import json
from datetime import datetime, timezone

from navi.llm.base import LLMBackend, Message

_SUMMARIZE_SYSTEM = (
    "You are summarizing a conversation history to free up context space. "
    "The assistant will continue working using ONLY this summary — it will have no access "
    "to the original messages. Be thorough and precise. Prefer specifics over generalities. "
    "This summary is historical context, not a new user request.\n\n"
    "## Current goal\n"
    "What the user is trying to accomplish in this session. Include any stated deadlines, "
    "constraints, or acceptance criteria.\n\n"
    "## Work state\n"
    "What has been completed (be specific — name files, functions, endpoints, steps). "
    "What is still in progress or pending. Any blockers or open questions.\n\n"
    "## Key facts\n"
    "Everything the user told the assistant: preferences, system details, environment info, "
    "decisions made, constraints discovered, explicit instructions. Include exact values "
    "(port numbers, file paths, config keys, IDs) — do not paraphrase if precision matters.\n\n"
    "## Outputs\n"
    "Every file created or modified (full paths). Every config value set. "
    "Commands run and their outcome. Preserve exact command output only when it is short "
    "and needed to prove a result or diagnose an error. "
    "Do not preserve tool-call-like examples with parenthesized arguments; describe tool "
    "usage in words or as key/value facts instead.\n\n"
    "## Errors\n"
    "Failures, exceptions, or unexpected results encountered. "
    "How each was resolved — or that it remains unresolved.\n\n"
    "## User preferences and feedback\n"
    "Corrections the user made to the assistant's approach. Explicit style or behavior "
    "preferences stated during this session. Things the user said not to do.\n\n"
    "Do not include greetings, filler, transitions, or meta-commentary about the summary itself. "
    "Do not use Markdown code fences or inline-code backticks unless preserving a user-authored "
    "literal value is essential. "
    "Write in tight prose or bullet points — whatever preserves more information per token."
)


def should_compress(context_tokens: int, max_context_tokens: int, threshold: float) -> bool:
    return context_tokens >= int(max_context_tokens * threshold)


def partition_messages(
    messages: list[Message],
    keep_recent: int,
    keep_recent_messages: int | None = None,
) -> tuple[list[Message], list[Message]]:
    """
    Returns (to_summarize, to_keep).

    Keeps the system message and the last `keep_recent` conversational turns verbatim.
    Everything older goes into to_summarize.
    Tool call groups (assistant + tool results) always stay together.
    """
    non_system = [m for m in messages if m.role != "system"]

    # Group into turns: each turn starts with a user message
    turns: list[list[Message]] = []
    current: list[Message] = []
    for msg in non_system:
        if msg.role == "user" and current:
            turns.append(current)
            current = [msg]
        else:
            current.append(msg)
    if current:
        turns.append(current)

    if len(turns) <= keep_recent:
        if keep_recent_messages is not None:
            intra_turn = partition_current_turn_messages(turns, keep_recent_messages)
            if intra_turn is not None:
                return intra_turn
        return [], non_system  # nothing old enough to compress

    old_turns = turns[:-keep_recent]
    recent_turns = turns[-keep_recent:]

    to_summarize = [m for turn in old_turns for m in turn]
    to_keep = [m for turn in recent_turns for m in turn]
    return to_summarize, to_keep


def partition_current_turn_messages(
    turns: list[list[Message]],
    keep_recent_messages: int,
) -> tuple[list[Message], list[Message]] | None:
    """
    Mid-turn fallback for long autonomous tool loops.

    A long chain of assistant/tool iterations after one user message is one
    conversational turn, so turn-based compression may have nothing to compress.
    Keep the current user request and the newest messages verbatim, then summarize
    older messages from the same in-flight turn.
    """
    if not turns:
        return None

    current_turn = turns[-1]
    if len(current_turn) <= keep_recent_messages + 1:
        return None

    head = [current_turn[0]] if current_turn and current_turn[0].role == "user" else []
    tail_start = max(len(head), len(current_turn) - keep_recent_messages)
    to_summarize = [m for turn in turns[:-1] for m in turn] + current_turn[len(head):tail_start]
    to_keep = head + current_turn[tail_start:]

    if len(to_summarize) < 2:
        return None

    return to_summarize, to_keep


def _format_for_summary(messages: list[Message]) -> tuple[str, list[str]]:
    """
    Render messages as plain text for the summarization prompt.

    Returns (text, images) where images is a flat list of base64 strings
    collected from all user messages. Vision-capable models will receive
    the images alongside the text; non-vision models silently ignore them.
    """
    lines: list[str] = []
    images: list[str] = []
    i = 0
    while i < len(messages):
        m = messages[i]

        if m.is_summary:
            # Existing summary — include as-is (already compressed)
            lines.append(m.content or "")
            i += 1

        elif m.role == "user":
            if m.images:
                images.extend(m.images)
                img_note = f" [+ {len(m.images)} image(s)]"
            else:
                img_note = ""
            if m.content:
                lines.append(f"User: {m.content}{img_note}")
            elif img_note:
                lines.append(f"User:{img_note}")
            i += 1

        elif m.role == "assistant" and m.tool_calls:
            # Render tool calls + their results as a compact block
            for tc in m.tool_calls:
                args_preview = json.dumps(tc.arguments)[:120]
                lines.append(f"[Tool call: {tc.name}; arguments preview: {args_preview}]")
            i += 1
            while i < len(messages) and messages[i].role == "tool":
                result = messages[i].content or ""
                preview = result[:300] + ("…" if len(result) > 300 else "")
                lines.append(f"[Tool result: {messages[i].name}; preview: {preview}]")
                i += 1

        elif m.role == "assistant" and m.content:
            lines.append(f"Assistant: {m.content}")
            i += 1

        else:
            i += 1  # skip orphan tool messages

    return "\n".join(lines), images


# Safety limit: truncate formatted input to this many characters before sending to LLM.
# Prevents the summarizer from receiving near-context-sized input it can't fit alongside output.
_MAX_SUMMARY_INPUT_CHARS = 24_000


async def compress_context(
    context: list[Message],
    llm: LLMBackend,
    model: "list[str] | str | None",
    temperature: float,
    keep_recent: int,
    max_tokens: int | None = None,
    keep_recent_messages: int | None = None,
) -> tuple[list[Message], str] | None:
    """
    Summarize old messages in the LLM context and return a shorter context list.
    Only operates on `context` — the full display history (session.messages) is never touched.
    Returns None if there is nothing to compress.

    Images from old user messages are passed to the summarization model.
    Vision-capable models will incorporate image descriptions into the summary;
    non-vision models silently ignore the images field.

    Uses the same model already loaded in memory (profile.model passed via WorkerContext) —
    no model swap, no extra loading overhead.

    Exceptions propagate to the caller (CompressionWorker catches them).
    """
    system_msgs = [m for m in context if m.role == "system"]
    to_summarize, to_keep = partition_messages(
        context,
        keep_recent,
        keep_recent_messages=keep_recent_messages,
    )

    # Fallback: if turn-based partition has nothing to compress but we are in
    # mid-turn mode (keep_recent_messages set), try an aggressive intra-turn
    # split keeping only the 2 newest messages of the current turn.
    if len(to_summarize) < 2 and keep_recent_messages is not None and keep_recent_messages > 2:
        to_summarize, to_keep = partition_messages(
            context,
            keep_recent,
            keep_recent_messages=2,
        )

    if len(to_summarize) < 2:
        return None  # nothing substantial to compress

    summary_text_input, images = _format_for_summary(to_summarize)

    # Truncate oversized input so the summarizer LLM has room to generate output
    if len(summary_text_input) > _MAX_SUMMARY_INPUT_CHARS:
        summary_text_input = summary_text_input[:_MAX_SUMMARY_INPUT_CHARS] + "\n…[truncated]"

    prompt = [
        Message(role="system", content=_SUMMARIZE_SYSTEM),
        Message(role="user", content=summary_text_input, images=images or None),
    ]

    # think=False: compression must be fast — extended reasoning wastes context and hangs
    response = await llm.complete(
        prompt, tools=None, temperature=temperature, model=model, think=False, max_tokens=max_tokens
    )
    summary_text = (response.content or "").strip() or "(summary unavailable)"

    summary_msg = Message(
        role="user",
        content=f"[Context Summary - historical context only, not a new user request]\n{summary_text}",
        is_summary=True,
        is_display=False,
        created_at=datetime.now(timezone.utc),
    )

    return system_msgs + [summary_msg] + to_keep, summary_text


class ContextCompressor:
    """High-level context compression with retry strategy and hard-truncate fallback.

    Thin wrapper around `compress_context` that adds:
    1. Retry with keep_recent + 4 on LLM failure.
    2. Hard-truncate fallback (drop oldest messages without summarizing).
    """

    @staticmethod
    def estimate_context_tokens(context: list[Message]) -> int:
        """Conservative local estimate used before the next LLM call returns real token counts.

        Uses ~3 chars per token (more conservative than the naive 4) because code and
        punctuation are often 1 token per character. Images counted at 500 tokens each
        (rough vision-model estimate).
        """
        chars = sum(len(m.content or "") for m in context)
        imgs = sum(500 for m in context if m.images)
        return chars // 3 + imgs

    async def compress_session(
        self,
        context: list[Message],
        llm: LLMBackend,
        model: "list[str] | str | None",
        temperature: float,
        keep_recent: int,
        max_tokens: int | None = None,
        keep_recent_messages: int | None = None,
    ) -> tuple[list[Message], str] | None:
        """Compress context with retry + hard-truncate fallback.

        Returns (new_context, summary_text) or None if nothing changed.
        Does NOT mutate the session — the caller is responsible for updating
        session.context, session.context_token_count, and persisting.
        """
        # Attempt 1: normal compression
        try:
            result = await compress_context(
                context=context,
                llm=llm,
                model=model,
                temperature=temperature,
                keep_recent=keep_recent,
                max_tokens=max_tokens,
                keep_recent_messages=keep_recent_messages,
            )
        except Exception:
            # Attempt 2: keep more recent turns verbatim
            try:
                result = await compress_context(
                    context=context,
                    llm=llm,
                    model=model,
                    temperature=temperature,
                    keep_recent=keep_recent + 4,
                    max_tokens=max_tokens,
                    keep_recent_messages=(keep_recent_messages + 4)
                    if keep_recent_messages is not None
                    else None,
                )
            except Exception:
                # Attempt 3: hard-truncate fallback
                return self._hard_truncate(context)

        if result is None:
            return None
        return result

    def _hard_truncate(
        self, context: list[Message]
    ) -> tuple[list[Message], str] | None:
        """Last-resort fallback: drop oldest non-system messages without summarizing."""
        system_msgs = [m for m in context if m.role == "system"]
        non_system = [m for m in context if m.role != "system"]

        _HARD_TRUNCATE_KEEP = 6
        if len(non_system) <= _HARD_TRUNCATE_KEEP:
            return None

        to_keep = non_system[-_HARD_TRUNCATE_KEEP:]
        new_context = system_msgs + to_keep
        summary_text = (
            "[Context was too large to summarize. Old messages were truncated to prevent "
            "the model from exceeding its context window. Some earlier details may have been lost.]"
        )
        return new_context, summary_text

    def check_context_size(self, context: list[Message]) -> None:
        """Raise ContextTooLargeError before an LLM call if the context is dangerously large.

        Uses a conservative character-based estimate (~3 chars per token for text).
        Images are counted at 500 tokens each (rough vision-model estimate).

        Checks against the *remaining* budget, not a fixed percentage of the window:
            available_for_input = ollama_num_ctx - output_reserve
        where output_reserve is a fixed token headroom reserved for the model's response.
        This correctly accounts for sessions where conversation history already consumes
        a large portion of the window.
        """
        from navi.config import settings
        from navi.exceptions import ContextTooLargeError

        if not context:
            return

        output_reserve = settings.output_reserve_tokens

        total = self.estimate_context_tokens(context)
        available = settings.ollama_num_ctx - output_reserve

        if total > available:
            existing = self.estimate_context_tokens(context[:-1])
            new = self.estimate_context_tokens(context[-1:])
            remaining = available - existing
            raise ContextTooLargeError(
                f"Context too large: new content is ~{new:,} estimated tokens, "
                f"but only ~{max(0, remaining):,} tokens are available "
                f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, "
                f"output_reserve {output_reserve:,}). "
                "Split the file into smaller parts or delegate to a subagent."
            )