Newer
Older
navi-1 / docs / plan_01_god_object_agent.md

План: Разбиение God object navi/core/agent.py

Цель: Agent превращается из 1349-строчного класса, содержащего бизнес-логику, в тонкий координатор (~200–300 строк), который только связывает сервисы и передаёт события в поток.

Принцип: Каждый шаг — отдельная итерация. После каждого шага тесты должны проходить. Не трогаем публичный API (run, run_stream, run_ephemeral) до финального шага.


Текущее состояние Agent

Agent
├── run()              — blocking complete()
├── run_stream()       — streaming loop + tool calling + compression + planning + stall
├── run_ephemeral()    — subagent loop + timeout + thinking stall
├── _compress_session_context()  — retry + hard-truncate
├── _check_context_size()
├── _estimate_context_tokens()
├── _run_workers()
├── _tool_list()
├── _get_backend()

Шаг 1 — Выделить ContextCompressor

Что уносим: Всё, что связано с компрессией контекста.

  • _compress_session_context() (строки ~700–850) — retry-логику, hard-truncate fallback
  • _check_context_size() — можно оставить как guard, но проверку threshold перенести
  • _estimate_context_tokens() — utility, остаётся в Agent или переезжает в compressor.py

Граница сервиса:

class ContextCompressor:
    async def compress_session(
        self,
        session: Session,
        llm: LLMBackend,
        model: list[str],
        reason: Literal["preturn", "midturn"],
        keep_recent_messages: int | None = None,
    ) -> ContextCompressed | None:
        ...  # retry + hard-truncate logic here

Почему первый: Уже есть navi/core/compressor.py — логика компрессии частично отделена. Добавляем туда retry и hard-truncate, удаляем из Agent.

Риск: Минимальный. Уже есть тесты на компрессор и на agent context size.


Шаг 2 — Выделить AgentTurnContext

Что уносим: Состояние одного пользовательского turn'а (на одну run_stream()).

Текущие локальные переменные в run_stream():

  • _turn_start: float
  • _tool_call_count: int
  • _subagent_tokens: int
  • _turn_tokens: int
  • _stall_no_todo: int
  • _stall_repeat_tools: int
  • _prev_tool_sigs: frozenset
  • _known_failed: frozenset
  • _replan_msg: str | None
  • _injected_fact_ids: set[str]
  • ctx_task / mem_facts_task — async prefetch

Граница:

@dataclass
class AgentTurnContext:
    turn_start: float
    tool_call_count: int = 0
    turn_tokens: int = 0
    subagent_tokens: int = 0
    stall_no_todo: int = 0
    stall_repeat_tools: int = 0
    prev_tool_sigs: frozenset = field(default_factory=frozenset)
    known_failed: frozenset = field(default_factory=frozenset)
    replan_msg: str | None = None
    injected_fact_ids: set = field(default_factory=set)

Зачем: Убираем 12 локальных переменных из run_stream(), делая метод читаемым. Контекст передаётся в выделенные сервисы (AntiStallMonitor, Compressor) вместо возврата tuple.


Шаг 3 — Выделить AntiStallMonitor

Что уносим: Логику stall detection.

  • Проверка _stall_no_todo >= threshold и _stall_repeat_tools >= threshold
  • Построение анти-сталл сообщения
  • Отслеживание _prev_tool_sigs (идентичные tool calls)
  • Отслеживание _known_failed (адаптивный реплан)

Граница:

class AntiStallMonitor:
    def __init__(self, profile: AgentProfile):
        self.profile = profile
        self.stall_no_todo = 0
        self.stall_repeat_tools = 0
        self.prev_tool_sigs: frozenset = frozenset()
        self.known_failed: frozenset = frozenset()
        self.replan_msg: str | None = None

    async def check(
        self, session_id: str, iteration: int, tool_calls: list[ToolCallRequest]
    ) -> str | None:
        """Returns system message to inject, or None."""

Зачем: Stall detection — самостоятельная политика. Её можно тестировать отдельно от всего агента.


Шаг 4 — Выделить SubAgentRunner

Что уносим: Всё из run_ephemeral().

  • Tool-calling loop для subagent
  • Timeout guard (elapsed >= timeout_seconds)
  • Thinking stall detection (_SUBAGENT_THINKING_STALL_SECONDS)
  • Context building для subagent (без persona)
  • Subagent system prompt assembly

Граница:

class SubAgentRunner:
    def __init__(
        self,
        agent: Agent,  # или ToolExecutor + BackendRegistry
        profile_registry: ProfileRegistry,
        tool_registry: ToolRegistry,
        backend_registry: BackendRegistry,
        ctx_builder: ContextBuilder,
        compressor: ContextCompressor,
    ):
        ...

    async def run(
        self,
        user_message: str,
        profile_id: str,
        max_iterations: int = 40,
        timeout_seconds: float = 300.0,
        context_transfer: str | None = None,
        briefing: str | None = None,
        # ... остальные параметры
    ) -> tuple[str, bool]:
        """Returns (result_text, success)."""

Зачем: run_ephemeral — это почти полная копия run_stream(), но с другими правилами (таймаут, thinking stall, нет streaming). Вынос позволяет:

  • Тестировать subagent без инициализации всего агента
  • Менять политику subagent независимо от основного loop'а
  • Переиспользовать SubAgentRunner из других мест (например, фоновые задачи)

Шаг 5 — Унифицировать streaming loop

Что делаем: После шагов 1–4 run_stream() должен уменьшиться с ~300 строк до ~80–100.

Текущая структура run_stream:

1. Проверка сессии
2. Pre-turn compression
3. Добавление user message
4. Planning
5. Context injections (providers, memory facts)
6. FOR iteration:
   a. Check stop_event
   b. Mid-turn compression
   c. Goal anchor
   d. Todo progress
   e. Adaptive replan
   f. Anti-stall
   g. Check context size
   h. stream_complete()
   i. Accumulate tokens/thinking
   j. Tool calls / save final
7. Workers

После рефакторинга:

1. Проверка сессии
2. turn = AgentTurnContext()
3. compressor.maybe_compress_preturn(session)
4. Добавление user message
5. planning.run(...)
6. context = ctx_builder.build(...)
7. FOR iteration:
   a. compressor.maybe_compress_midturn(session, iteration)
   b. anti_stall.check(...)
   c. goal_anchor
   d. todo_progress
   e. stream_complete()
   f. turn.accumulate(chunk)
   g. tool calls / save final
8. workers.run(...)

Все внутренние переменные (token count, stall counters) живут в turn. Логика каждой фичи — в своём сервисе.


Шаг 6 — Очистить Agent.run()

Что делаем: run() сейчас содержит копию tool-calling loop, отличную от run_stream(). После выноса SubAgentRunner можно переиспользовать его логику для run():

async def run(self, session_id, user_message):
    """Blocking variant — collects stream into string."""
    result = ""
    async for event in self.run_stream(session_id, user_message):
        if isinstance(event, StreamEnd):
            result = event.full_content
    return result

Убираем полную копию loop'а. Единственная разница — run() не стримит, но внутри вызывает run_stream() и дропает промежуточные события.


Шаг 7 — Финальная очистка и тесты

  • Убрать мёртвый код (оставшиеся приватные методы, которые переехали)
  • Проверить, что публичный API не изменился
  • Запустить полный regression: pytest tests/ -x --tb=short --ignore=tests/integration/test_websocket.py
  • Обновить docs/architecture_weak_spots.md — отметить пункт 1 как выполненный

Ожидаемый результат

navi/core/
├── agent.py              ~250 строк (координатор)
├── agent_run_context.py  ~50 строк  (dataclass + helpers)
├── compressor.py         ~200 строк (уже существует, + retry/hard-truncate)
├── anti_stall.py         ~80 строк  (новый)
├── subagent_runner.py    ~180 строк (новый)
├── planning.py           ~150 строк (уже существует)
├── context_builder.py    ~120 строк (уже существует)
├── tool_executor.py      ~80 строк  (после DRY-фикса)

Порядок работы

  1. Шаг 1 — ContextCompressor (retry + hard-truncate в compressor.py)
  2. Шаг 2 — AgentTurnContext (refactor run_stream locals)
  3. Шаг 3 — AntiStallMonitor
  4. Шаг 4 — SubAgentRunner
  5. Шаг 5 — Унификация run_stream (после готовности сервисов)
  6. Шаг 6 — run() через run_stream()
  7. Шаг 7 — Финальная очистка

Каждый шаг — отдельный коммит. Тесты должны проходить после каждого.