"""
Context compressor — summarizes old messages to stay within the token limit.
Flow:
1. Partition session messages into "to_summarize" (old turns) and "to_keep" (recent turns).
2. Call the LLM to produce a concise bullet-point summary of the old turns.
3. Replace the old turns with a single summary message (role=user, is_summary=True).
A "turn" is one user message plus all following assistant/tool messages up to the
next user message. Tool call groups (assistant + tool results) are never split.
Existing summary messages are always folded into the next compression pass.
"""
import json
from datetime import datetime, timezone
from navi.llm.base import LLMBackend, Message
_SUMMARIZE_SYSTEM = (
"You are summarizing a conversation history to free up context space. "
"The assistant will continue working using ONLY this summary — it will have no access "
"to the original messages. Be thorough and precise. Prefer specifics over generalities. "
"This summary is historical context, not a new user request.\n\n"
"## Current goal\n"
"What the user is trying to accomplish in this session. Include any stated deadlines, "
"constraints, or acceptance criteria.\n\n"
"## Work state\n"
"What has been completed (be specific — name files, functions, endpoints, steps). "
"What is still in progress or pending. Any blockers or open questions.\n\n"
"## Key facts\n"
"Everything the user told the assistant: preferences, system details, environment info, "
"decisions made, constraints discovered, explicit instructions. Include exact values "
"(port numbers, file paths, config keys, IDs) — do not paraphrase if precision matters.\n\n"
"## Outputs\n"
"Every file created or modified (full paths). Every config value set. "
"Commands run and their outcome. Preserve exact command output only when it is short "
"and needed to prove a result or diagnose an error. "
"Do not preserve tool-call-like examples with parenthesized arguments; describe tool "
"usage in words or as key/value facts instead.\n\n"
"## Errors\n"
"Failures, exceptions, or unexpected results encountered. "
"How each was resolved — or that it remains unresolved.\n\n"
"## User preferences and feedback\n"
"Corrections the user made to the assistant's approach. Explicit style or behavior "
"preferences stated during this session. Things the user said not to do.\n\n"
"Do not include greetings, filler, transitions, or meta-commentary about the summary itself. "
"Do not use Markdown code fences or inline-code backticks unless preserving a user-authored "
"literal value is essential. "
"Write in tight prose or bullet points — whatever preserves more information per token."
)
def should_compress(context_tokens: int, max_context_tokens: int, threshold: float) -> bool:
return context_tokens >= int(max_context_tokens * threshold)
def partition_messages(
messages: list[Message],
keep_recent: int,
keep_recent_messages: int | None = None,
) -> tuple[list[Message], list[Message]]:
"""
Returns (to_summarize, to_keep).
Keeps the system message and the last `keep_recent` conversational turns verbatim.
Everything older goes into to_summarize.
Tool call groups (assistant + tool results) always stay together.
"""
non_system = [m for m in messages if m.role != "system"]
# Group into turns: each turn starts with a user message
turns: list[list[Message]] = []
current: list[Message] = []
for msg in non_system:
if msg.role == "user" and current:
turns.append(current)
current = [msg]
else:
current.append(msg)
if current:
turns.append(current)
if len(turns) <= keep_recent:
if keep_recent_messages is not None:
intra_turn = partition_current_turn_messages(turns, keep_recent_messages)
if intra_turn is not None:
return intra_turn
return [], non_system # nothing old enough to compress
old_turns = turns[:-keep_recent]
recent_turns = turns[-keep_recent:]
to_summarize = [m for turn in old_turns for m in turn]
to_keep = [m for turn in recent_turns for m in turn]
return to_summarize, to_keep
def partition_current_turn_messages(
turns: list[list[Message]],
keep_recent_messages: int,
) -> tuple[list[Message], list[Message]] | None:
"""
Mid-turn fallback for long autonomous tool loops.
A long chain of assistant/tool iterations after one user message is one
conversational turn, so turn-based compression may have nothing to compress.
Keep the current user request and the newest messages verbatim, then summarize
older messages from the same in-flight turn.
"""
if not turns:
return None
current_turn = turns[-1]
if len(current_turn) <= keep_recent_messages + 1:
return None
head = [current_turn[0]] if current_turn and current_turn[0].role == "user" else []
tail_start = max(len(head), len(current_turn) - keep_recent_messages)
to_summarize = [m for turn in turns[:-1] for m in turn] + current_turn[len(head):tail_start]
to_keep = head + current_turn[tail_start:]
if len(to_summarize) < 2:
return None
return to_summarize, to_keep
def _format_for_summary(messages: list[Message]) -> tuple[str, list[str]]:
"""
Render messages as plain text for the summarization prompt.
Returns (text, images) where images is a flat list of base64 strings
collected from all user messages. Vision-capable models will receive
the images alongside the text; non-vision models silently ignore them.
"""
lines: list[str] = []
images: list[str] = []
i = 0
while i < len(messages):
m = messages[i]
if m.is_summary:
# Existing summary — include as-is (already compressed)
lines.append(m.content or "")
i += 1
elif m.role == "user":
if m.images:
images.extend(m.images)
img_note = f" [+ {len(m.images)} image(s)]"
else:
img_note = ""
if m.content:
lines.append(f"User: {m.content}{img_note}")
elif img_note:
lines.append(f"User:{img_note}")
i += 1
elif m.role == "assistant" and m.tool_calls:
# Render tool calls + their results as a compact block
for tc in m.tool_calls:
args_preview = json.dumps(tc.arguments)[:120]
lines.append(f"[Tool call: {tc.name}; arguments preview: {args_preview}]")
i += 1
while i < len(messages) and messages[i].role == "tool":
result = messages[i].content or ""
preview = result[:300] + ("…" if len(result) > 300 else "")
lines.append(f"[Tool result: {messages[i].name}; preview: {preview}]")
i += 1
elif m.role == "assistant" and m.content:
lines.append(f"Assistant: {m.content}")
i += 1
else:
i += 1 # skip orphan tool messages
return "\n".join(lines), images
# Safety limit: truncate formatted input to this many characters before sending to LLM.
# Prevents the summarizer from receiving near-context-sized input it can't fit alongside output.
_MAX_SUMMARY_INPUT_CHARS = 24_000
async def compress_context(
context: list[Message],
llm: LLMBackend,
model: "list[str] | str | None",
temperature: float,
keep_recent: int,
max_tokens: int | None = None,
keep_recent_messages: int | None = None,
) -> tuple[list[Message], str] | None:
"""
Summarize old messages in the LLM context and return a shorter context list.
Only operates on `context` — the full display history (session.messages) is never touched.
Returns None if there is nothing to compress.
Images from old user messages are passed to the summarization model.
Vision-capable models will incorporate image descriptions into the summary;
non-vision models silently ignore the images field.
Uses the same model already loaded in memory (profile.model passed via WorkerContext) —
no model swap, no extra loading overhead.
Exceptions propagate to the caller (CompressionWorker catches them).
"""
system_msgs = [m for m in context if m.role == "system"]
to_summarize, to_keep = partition_messages(
context,
keep_recent,
keep_recent_messages=keep_recent_messages,
)
# Fallback: if turn-based partition has nothing to compress but we are in
# mid-turn mode (keep_recent_messages set), try an aggressive intra-turn
# split keeping only the 2 newest messages of the current turn.
if len(to_summarize) < 2 and keep_recent_messages is not None and keep_recent_messages > 2:
to_summarize, to_keep = partition_messages(
context,
keep_recent,
keep_recent_messages=2,
)
if len(to_summarize) < 2:
return None # nothing substantial to compress
summary_text_input, images = _format_for_summary(to_summarize)
# Truncate oversized input so the summarizer LLM has room to generate output
if len(summary_text_input) > _MAX_SUMMARY_INPUT_CHARS:
summary_text_input = summary_text_input[:_MAX_SUMMARY_INPUT_CHARS] + "\n…[truncated]"
prompt = [
Message(role="system", content=_SUMMARIZE_SYSTEM),
Message(role="user", content=summary_text_input, images=images or None),
]
# think=False: compression must be fast — extended reasoning wastes context and hangs
response = await llm.complete(
prompt, tools=None, temperature=temperature, model=model, think=False, max_tokens=max_tokens
)
summary_text = (response.content or "").strip() or "(summary unavailable)"
summary_msg = Message(
role="user",
content=f"[Context Summary - historical context only, not a new user request]\n{summary_text}",
is_summary=True,
created_at=datetime.now(timezone.utc),
)
return system_msgs + [summary_msg] + to_keep, summary_text
class ContextCompressor:
"""High-level context compression with retry strategy and hard-truncate fallback.
Thin wrapper around `compress_context` that adds:
1. Retry with keep_recent + 4 on LLM failure.
2. Hard-truncate fallback (drop oldest messages without summarizing).
"""
@staticmethod
def estimate_context_tokens(context: list[Message]) -> int:
"""Conservative local estimate used before the next LLM call returns real token counts.
Uses ~3 chars per token (more conservative than the naive 4) because code and
punctuation are often 1 token per character. Images counted at 500 tokens each
(rough vision-model estimate).
"""
chars = sum(len(m.content or "") for m in context)
imgs = sum(500 for m in context if m.images)
return chars // 3 + imgs
async def compress_session(
self,
context: list[Message],
llm: LLMBackend,
model: "list[str] | str | None",
temperature: float,
keep_recent: int,
max_tokens: int | None = None,
keep_recent_messages: int | None = None,
) -> tuple[list[Message], str] | None:
"""Compress context with retry + hard-truncate fallback.
Returns (new_context, summary_text) or None if nothing changed.
Does NOT mutate the session — the caller is responsible for updating
session.context, session.context_token_count, and persisting.
"""
# Attempt 1: normal compression
try:
result = await compress_context(
context=context,
llm=llm,
model=model,
temperature=temperature,
keep_recent=keep_recent,
max_tokens=max_tokens,
keep_recent_messages=keep_recent_messages,
)
except Exception:
# Attempt 2: keep more recent turns verbatim
try:
result = await compress_context(
context=context,
llm=llm,
model=model,
temperature=temperature,
keep_recent=keep_recent + 4,
max_tokens=max_tokens,
keep_recent_messages=(keep_recent_messages + 4)
if keep_recent_messages is not None
else None,
)
except Exception:
# Attempt 3: hard-truncate fallback
return self._hard_truncate(context)
if result is None:
return None
return result
def _hard_truncate(
self, context: list[Message]
) -> tuple[list[Message], str] | None:
"""Last-resort fallback: drop oldest non-system messages without summarizing."""
system_msgs = [m for m in context if m.role == "system"]
non_system = [m for m in context if m.role != "system"]
_HARD_TRUNCATE_KEEP = 6
if len(non_system) <= _HARD_TRUNCATE_KEEP:
return None
to_keep = non_system[-_HARD_TRUNCATE_KEEP:]
new_context = system_msgs + to_keep
summary_text = (
"[Context was too large to summarize. Old messages were truncated to prevent "
"the model from exceeding its context window. Some earlier details may have been lost.]"
)
return new_context, summary_text