"""Context compression worker."""
import structlog
from navi.config import settings
from navi.core.compressor import compress_context, should_compress
from navi.core.events import ContextCompressed
from .base import Worker, WorkerContext, WorkerResult
log = structlog.get_logger()
class CompressionWorker(Worker):
"""
Compresses session.context when it approaches the token limit.
session.messages (full display history) is never modified.
"""
async def run(self, session, ctx: WorkerContext) -> WorkerResult:
if not settings.context_compression_enabled:
return WorkerResult()
if ctx.context_tokens is None:
return WorkerResult()
if not should_compress(ctx.context_tokens, ctx.max_context_tokens,
settings.context_compression_threshold):
return WorkerResult()
count_before = len(session.context)
try:
new_context = await compress_context(
context=session.context,
llm=ctx.llm,
model=ctx.model,
temperature=settings.context_summary_temperature,
keep_recent=settings.context_keep_recent,
)
except Exception:
log.warning("compression_worker.llm_failed", session_id=ctx.session_id, exc_info=True)
return WorkerResult()
if new_context is None:
return WorkerResult()
session.context = new_context
session.context_token_count = 0 # reset so next turn doesn't re-compress pre-call
await ctx.session_store.save(session)
log.info(
"compression_worker.done",
session_id=ctx.session_id,
before=count_before,
after=len(session.context),
)
return WorkerResult(events=[ContextCompressed(
messages_before=count_before,
messages_after=len(session.context),
)])