Newer
Older
navi-1 / navi / workers / compressor.py
"""Context compression worker."""

import structlog

from navi.config import settings
from navi.core.compressor import compress_context, should_compress, ContextCompressor
from navi.core.events import ContextCompressed
from navi.llm.base import Message

from .base import Worker, WorkerContext, WorkerResult

log = structlog.get_logger()


class CompressionWorker(Worker):
    """
    Compresses session.context when it approaches the token limit.
    session.messages (full display history) is never modified.
    """

    async def run(self, session, ctx: WorkerContext) -> WorkerResult:
        if not settings.context_compression_enabled:
            return WorkerResult()
        if ctx.context_tokens is None:
            return WorkerResult()
        if not should_compress(ctx.context_tokens, ctx.max_context_tokens,
                               settings.context_compression_threshold):
            return WorkerResult()

        count_before = len(session.context)
        try:
            result = await compress_context(
                context=session.context,
                llm=ctx.llm,
                model=ctx.model,
                temperature=settings.context_summary_temperature,
                keep_recent=settings.context_keep_recent,
                max_tokens=settings.context_summary_max_tokens,
            )
        except Exception:
            log.warning("compression_worker.llm_failed", session_id=ctx.session_id, exc_info=True)
            return WorkerResult()

        if result is None:
            return WorkerResult()

        new_context, summary_text = result

        # Mark messages that are no longer part of the LLM context
        new_context_ids = {id(m) for m in new_context}
        for msg in session.messages:
            if id(msg) not in new_context_ids and msg.role != "system":
                msg.is_context = False

        # The summary returned by the compressor must also live in messages so
        # save() writes it to the normalized table, but it is not displayed.
        summary_msg = next((m for m in new_context if m.is_summary), None)
        if summary_msg and summary_msg not in session.messages:
            summary_msg.is_display = False
            session.messages.append(summary_msg)

        # UI marker showing that compression happened
        session.messages.append(Message(
            role="system",
            is_compression=True,
            is_context=False,
            content=summary_text,
        ))

        session.context = new_context
        session.context_token_count = ContextCompressor.estimate_context_tokens(new_context)
        await ctx.session_store.save(session)

        log.info(
            "compression_worker.done",
            session_id=ctx.session_id,
            before=count_before,
            after=len(session.context),
        )

        return WorkerResult(events=[ContextCompressed(
            messages_before=count_before,
            messages_after=len(session.context),
            summary=summary_text,
            context_tokens=session.context_token_count,
            max_context_tokens=ctx.max_context_tokens,
        )])