"""Context compression worker."""
import structlog
from navi.config import settings
from navi.core.compressor import compress_context, should_compress, ContextCompressor
from navi.core.events import ContextCompressed
from navi.llm.base import Message
from .base import Worker, WorkerContext, WorkerResult
log = structlog.get_logger()
class CompressionWorker(Worker):
"""
Compresses session.context when it approaches the token limit.
session.messages (full display history) is never modified.
"""
async def run(self, session, ctx: WorkerContext) -> WorkerResult:
if not settings.context_compression_enabled:
return WorkerResult()
if ctx.context_tokens is None:
return WorkerResult()
if not should_compress(ctx.context_tokens, ctx.max_context_tokens,
settings.context_compression_threshold):
return WorkerResult()
count_before = len(session.context)
try:
result = await compress_context(
context=session.context,
llm=ctx.llm,
model=ctx.model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
max_tokens=settings.context_summary_max_tokens,
)
except Exception:
log.warning("compression_worker.llm_failed", session_id=ctx.session_id, exc_info=True)
return WorkerResult()
if result is None:
return WorkerResult()
new_context, summary_text = result
# Mark messages that are no longer part of the LLM context
new_context_ids = {id(m) for m in new_context}
for msg in session.messages:
if id(msg) not in new_context_ids and msg.role != "system":
msg.is_context = False
# The summary returned by the compressor must also live in messages so
# save() writes it to the normalized table, but it is not displayed.
summary_msg = next((m for m in new_context if m.is_summary), None)
if summary_msg and summary_msg not in session.messages:
summary_msg.is_display = False
session.messages.append(summary_msg)
# UI marker showing that compression happened
session.messages.append(Message(
role="system",
is_compression=True,
is_context=False,
content=summary_text,
))
session.context = new_context
session.context_token_count = ContextCompressor.estimate_context_tokens(new_context)
await ctx.session_store.save(session)
log.info(
"compression_worker.done",
session_id=ctx.session_id,
before=count_before,
after=len(session.context),
)
return WorkerResult(events=[ContextCompressed(
messages_before=count_before,
messages_after=len(session.context),
summary=summary_text,
context_tokens=session.context_token_count,
max_context_tokens=ctx.max_context_tokens,
)])