Newer
Older
navi-1 / navi / workers / compressor.py
"""Context compression worker."""

import structlog

from navi.config import settings
from navi.core.compressor import compress_context, should_compress
from navi.core.events import ContextCompressed

from .base import Worker, WorkerContext, WorkerResult

log = structlog.get_logger()


class CompressionWorker(Worker):
    """
    Compresses session.context when it approaches the token limit.
    session.messages (full display history) is never modified.
    """

    async def run(self, session, ctx: WorkerContext) -> WorkerResult:
        if not settings.context_compression_enabled:
            return WorkerResult()
        if ctx.context_tokens is None:
            return WorkerResult()
        if not should_compress(ctx.context_tokens, ctx.max_context_tokens,
                               settings.context_compression_threshold):
            return WorkerResult()

        count_before = len(session.context)
        try:
            new_context = await compress_context(
                context=session.context,
                llm=ctx.llm,
                model=ctx.model,
                temperature=settings.context_summary_temperature,
                keep_recent=settings.context_keep_recent,
            )
        except Exception:
            log.warning("compression_worker.llm_failed", session_id=ctx.session_id, exc_info=True)
            return WorkerResult()

        if new_context is None:
            return WorkerResult()

        session.context = new_context
        session.context_token_count = 0  # reset so next turn doesn't re-compress pre-call
        await ctx.session_store.save(session)

        log.info(
            "compression_worker.done",
            session_id=ctx.session_id,
            before=count_before,
            after=len(session.context),
        )

        return WorkerResult(events=[ContextCompressed(
            messages_before=count_before,
            messages_after=len(session.context),
        )])