navi/core/agent.pyЦель: Agent превращается из 1349-строчного класса, содержащего бизнес-логику, в тонкий координатор (~200–300 строк), который только связывает сервисы и передаёт события в поток.
Принцип: Каждый шаг — отдельная итерация. После каждого шага тесты должны проходить. Не трогаем публичный API (run, run_stream, run_ephemeral) до финального шага.
AgentAgent ├── run() — blocking complete() ├── run_stream() — streaming loop + tool calling + compression + planning + stall ├── run_ephemeral() — subagent loop + timeout + thinking stall ├── _compress_session_context() — retry + hard-truncate ├── _check_context_size() ├── _estimate_context_tokens() ├── _run_workers() ├── _tool_list() ├── _get_backend()
ContextCompressorЧто уносим: Всё, что связано с компрессией контекста.
_compress_session_context() (строки ~700–850) — retry-логику, hard-truncate fallback_check_context_size() — можно оставить как guard, но проверку threshold перенести_estimate_context_tokens() — utility, остаётся в Agent или переезжает в compressor.pyГраница сервиса:
class ContextCompressor:
async def compress_session(
self,
session: Session,
llm: LLMBackend,
model: list[str],
reason: Literal["preturn", "midturn"],
keep_recent_messages: int | None = None,
) -> ContextCompressed | None:
... # retry + hard-truncate logic here
Почему первый: Уже есть navi/core/compressor.py — логика компрессии частично отделена. Добавляем туда retry и hard-truncate, удаляем из Agent.
Риск: Минимальный. Уже есть тесты на компрессор и на agent context size.
AgentTurnContextЧто уносим: Состояние одного пользовательского turn'а (на одну run_stream()).
Текущие локальные переменные в run_stream():
_turn_start: float_tool_call_count: int_subagent_tokens: int_turn_tokens: int_stall_no_todo: int_stall_repeat_tools: int_prev_tool_sigs: frozenset_known_failed: frozenset_replan_msg: str | None_injected_fact_ids: set[str]ctx_task / mem_facts_task — async prefetchГраница:
@dataclass
class AgentTurnContext:
turn_start: float
tool_call_count: int = 0
turn_tokens: int = 0
subagent_tokens: int = 0
stall_no_todo: int = 0
stall_repeat_tools: int = 0
prev_tool_sigs: frozenset = field(default_factory=frozenset)
known_failed: frozenset = field(default_factory=frozenset)
replan_msg: str | None = None
injected_fact_ids: set = field(default_factory=set)
Зачем: Убираем 12 локальных переменных из run_stream(), делая метод читаемым. Контекст передаётся в выделенные сервисы (AntiStallMonitor, Compressor) вместо возврата tuple.
AntiStallMonitorЧто уносим: Логику stall detection.
_stall_no_todo >= threshold и _stall_repeat_tools >= threshold_prev_tool_sigs (идентичные tool calls)_known_failed (адаптивный реплан)Граница:
class AntiStallMonitor:
def __init__(self, profile: AgentProfile):
self.profile = profile
self.stall_no_todo = 0
self.stall_repeat_tools = 0
self.prev_tool_sigs: frozenset = frozenset()
self.known_failed: frozenset = frozenset()
self.replan_msg: str | None = None
async def check(
self, session_id: str, iteration: int, tool_calls: list[ToolCallRequest]
) -> str | None:
"""Returns system message to inject, or None."""
Зачем: Stall detection — самостоятельная политика. Её можно тестировать отдельно от всего агента.
SubAgentRunnerЧто уносим: Всё из run_ephemeral().
elapsed >= timeout_seconds)_SUBAGENT_THINKING_STALL_SECONDS)Граница:
class SubAgentRunner:
def __init__(
self,
agent: Agent, # или ToolExecutor + BackendRegistry
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
ctx_builder: ContextBuilder,
compressor: ContextCompressor,
):
...
async def run(
self,
user_message: str,
profile_id: str,
max_iterations: int = 40,
timeout_seconds: float = 300.0,
context_transfer: str | None = None,
briefing: str | None = None,
# ... остальные параметры
) -> tuple[str, bool]:
"""Returns (result_text, success)."""
Зачем: run_ephemeral — это почти полная копия run_stream(), но с другими правилами (таймаут, thinking stall, нет streaming). Вынос позволяет:
Что делаем: После шагов 1–4 run_stream() должен уменьшиться с ~300 строк до ~80–100.
Текущая структура run_stream:
1. Проверка сессии 2. Pre-turn compression 3. Добавление user message 4. Planning 5. Context injections (providers, memory facts) 6. FOR iteration: a. Check stop_event b. Mid-turn compression c. Goal anchor d. Todo progress e. Adaptive replan f. Anti-stall g. Check context size h. stream_complete() i. Accumulate tokens/thinking j. Tool calls / save final 7. Workers
После рефакторинга:
1. Проверка сессии 2. turn = AgentTurnContext() 3. compressor.maybe_compress_preturn(session) 4. Добавление user message 5. planning.run(...) 6. context = ctx_builder.build(...) 7. FOR iteration: a. compressor.maybe_compress_midturn(session, iteration) b. anti_stall.check(...) c. goal_anchor d. todo_progress e. stream_complete() f. turn.accumulate(chunk) g. tool calls / save final 8. workers.run(...)
Все внутренние переменные (token count, stall counters) живут в turn. Логика каждой фичи — в своём сервисе.
Agent.run()Что делаем: run() сейчас содержит копию tool-calling loop, отличную от run_stream(). После выноса SubAgentRunner можно переиспользовать его логику для run():
async def run(self, session_id, user_message):
"""Blocking variant — collects stream into string."""
result = ""
async for event in self.run_stream(session_id, user_message):
if isinstance(event, StreamEnd):
result = event.full_content
return result
Убираем полную копию loop'а. Единственная разница — run() не стримит, но внутри вызывает run_stream() и дропает промежуточные события.
pytest tests/ -x --tb=short --ignore=tests/integration/test_websocket.pydocs/architecture_weak_spots.md — отметить пункт 1 как выполненныйnavi/core/ ├── agent.py ~250 строк (координатор) ├── agent_run_context.py ~50 строк (dataclass + helpers) ├── compressor.py ~200 строк (уже существует, + retry/hard-truncate) ├── anti_stall.py ~80 строк (новый) ├── subagent_runner.py ~180 строк (новый) ├── planning.py ~150 строк (уже существует) ├── context_builder.py ~120 строк (уже существует) ├── tool_executor.py ~80 строк (после DRY-фикса)
Каждый шаг — отдельный коммит. Тесты должны проходить после каждого.