diff --git a/docs/architecture_weak_spots.md b/docs/architecture_weak_spots.md index 7caa850..9e0477b 100644 --- a/docs/architecture_weak_spots.md +++ b/docs/architecture_weak_spots.md @@ -5,14 +5,25 @@ --- -## 1. God object `navi/core/agent.py` +## 1. God object `navi/core/agent.py` ✅ **Severity:** Critical -**Файл:** `navi/core/agent.py` (1349 строк) +**Файл:** `navi/core/agent.py` (1349 → ~410 строк) **Проблема:** Класс `Agent` одновременно управляет тремя режимами (`run`, `run_stream`, `run_ephemeral`), компрессией контекста (3 уровня fallback), планированием, анти-столлингом, адаптивным репланингом, подсчётом токенов, stall-детекцией, обработкой изображений и под-агентами. **Почему блокер:** Любое изменение в одной подсистеме требует правки одного файла. Параллельная работа нескольких разработчиков невозможна без конфликтов. Unit-тесты вынуждены инициализировать весь агент даже для проверки одного метода. **Направление:** Выделить `PlanningOrchestrator`, `ContextCompressor`, `SubAgentRunner`, `AntiStallMonitor` в отдельные сервисы. `Agent` должен остаться только координатором. +**Решение 2026-05-16:** +- `ContextCompressor` → `navi/core/compressor.py` +- `AntiStallMonitor` → `navi/core/anti_stall.py` +- `SubAgentRunner` → `navi/core/subagent_runner.py` +- `AgentTurnContext` / `StreamState` → `navi/core/agent_run_context.py` +- `_iter_stream_guarded` → `navi/core/stream_guard.py` +- `build_tool_list` / `load_user_enabled_tools` → `navi/core/tool_utils.py` +- `run()` — тонкая обёртка вокруг `run_stream()` +- `run_stream()` делегирует `_compression_events_preturn`, `_compression_events_midturn`, `_consume_stream`, `_execute_tools_with_sink` +- `run_ephemeral()` делегирует `SubAgentRunner.run()` + --- ## 2. Глобальные ленивые синглтоны в `navi/api/deps.py` diff --git a/navi/core/agent.py b/navi/core/agent.py index 49e597a..5b40650 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -16,7 +16,6 @@ """ import asyncio -import json import time from datetime import datetime, timezone from typing import TYPE_CHECKING, AsyncGenerator @@ -25,10 +24,10 @@ from navi.config import settings from navi.exceptions import ContextTooLargeError, LLMBackendError, LLMConnectionError, MaxIterationsReached, SessionNotFound -from navi.llm.base import LLMBackend, LLMChunk, Message, ToolCallRequest +from navi.llm.base import LLMBackend, Message, ToolCallRequest from navi.tools._internal.base import Tool, current_event_sink, current_stop_event -from .agent_run_context import AgentTurnContext +from .agent_run_context import AgentTurnContext, StreamState from .anti_stall import AntiStallMonitor from .compressor import ContextCompressor, should_compress from .context_builder import ContextBuilder @@ -124,100 +123,11 @@ async def run(self, session_id: str, user_message: str, images: list[str] | None = None, files: list[dict] | None = None, is_recall: bool = False) -> str: """Non-streaming: run the full tool-calling loop and return the final text.""" - session = await self._sessions.get(session_id) - if session is None: - raise SessionNotFound(session_id) - - profile = self._profiles.get(session.profile_id) - tools = self._tool_list(profile.enabled_tools, profile.mcp_servers) - tool_schemas = [t.schema() for t in tools] - llm = self._get_backend(profile.llm_backend) - - mem = await self._ctx_builder._memory_msg(user_id=session.user_id) - - # Expose session_id to tools (e.g. SSH connection pool) via ContextVar - from navi.tools._internal.base import current_session_id as _sid_var - _sid_var.set(session_id) - - # current_user_id and current_user_role are set by the caller - # (websocket_session or messages endpoint) before run()/run_stream() - - if user_message: - content = user_message - if is_recall: - content = f"[Scheduled recall — execute this task]\n\n{content}" - user_msg = Message(role="user", content=content, images=images or None, - files=files or None, created_at=datetime.now(timezone.utc), - is_recall=is_recall) - session.messages.append(user_msg) - session.context.append(user_msg) - await self._sessions.save(session) - - _injected_fact_ids: set[str] = set() - ctx_task = asyncio.create_task(self._ctx_builder._collect_context_injections(profile)) - mem_facts_task = asyncio.create_task( - self._ctx_builder._memory_facts_msg( - user_message, user_id=session.user_id, injected_ids=_injected_fact_ids - ) - ) - ctx_injections = await ctx_task - mem_facts = await mem_facts_task - if mem_facts: - ctx_injections.append(mem_facts) - log.debug("agent.memory_facts_injected", session_id=session_id, facts_msg_length=len(mem_facts.content or "")) - else: - log.debug("agent.memory_facts_none", session_id=session_id) - - _turn_tokens = 0 - for iteration in range(profile.max_iterations): - log.debug("agent.iteration", session_id=session_id, iteration=iteration) - response = await llm.complete( - self._ctx_builder.build(session.context, profile, mem, - iteration=iteration, max_iterations=profile.max_iterations, - extra_system=ctx_injections, - session_id=session_id), - tools=tool_schemas if tools else None, - temperature=profile.temperature, - model=profile.model, - think=profile.think_enabled, - top_k=profile.top_k, - top_p=profile.top_p, - num_thread=profile.num_thread, - ) - _turn_tokens += (response.prompt_tokens or 0) + (response.completion_tokens or 0) - - if response.finish_reason == "stop" or not response.tool_calls: - content = response.content or "" - assistant_msg = Message( - role="assistant", - content=content, - thinking=response.thinking, - created_at=datetime.now(timezone.utc), - token_count=_turn_tokens if _turn_tokens else None, - ) - session.messages.append(assistant_msg) - session.context.append(assistant_msg) - await self._sessions.save(session) - return content - - # Tool calls turn — append to both messages and context - assistant_msg = Message( - role="assistant", - content=response.content, - thinking=response.thinking, - tool_calls=response.tool_calls, - ) - session.messages.append(assistant_msg) - session.context.append(assistant_msg) - - tool_results, image_injections = await self._tool_executor._execute_tool_calls(response.tool_calls, tools) - session.messages.extend(tool_results) - session.context.extend(tool_results) - # Image injections are synthetic LLM helpers — context only - session.context.extend(image_injections) - - await self._sessions.save(session) - raise MaxIterationsReached(profile.max_iterations) + full_content = "" + async for event in self.run_stream(session_id, user_message, images=images, files=files, is_recall=is_recall): + if isinstance(event, StreamEnd): + full_content = event.full_content or "" + return full_content async def run_ephemeral( self, @@ -287,28 +197,8 @@ # threshold, compress NOW before calling the LLM. This prevents the # model from seeing an over-full context and generating gibberish # (e.g. a "summary of the conversation" instead of a real answer). - if ( - settings.context_compression_enabled - and len(session.context) > 2 - and should_compress( - self._compressor.estimate_context_tokens(session.context), - settings.ollama_num_ctx, - settings.context_compression_threshold, - ) - ): - yield CompressionStarted( - context_tokens=self._compressor.estimate_context_tokens(session.context), - max_context_tokens=settings.ollama_num_ctx, - ) - event = await self._do_compress_and_save( - session=session, - llm=llm, - model=profile.model, - session_id=session_id, - reason="preturn", - ) - if event: - yield event + async for _ev in self._compression_events_preturn(session, llm, profile, session_id): + yield _ev display_text = display_message if display_message is not None else user_message user_msg_display = Message(role="user", content=display_text, images=images or None, @@ -372,40 +262,10 @@ yield StreamStopped() return - if settings.context_compression_enabled and iteration > 0: - preflight_ctx = self._ctx_builder.build( - session.context, - profile, - mem, - extra_system=ctx_injections, - session_id=session_id, - ) - estimated_tokens = self._compressor.estimate_context_tokens(preflight_ctx) - if should_compress( - estimated_tokens, - settings.ollama_num_ctx, - settings.context_compression_threshold, - ): - yield CompressionStarted( - context_tokens=estimated_tokens, - max_context_tokens=settings.ollama_num_ctx, - ) - event = await self._do_compress_and_save( - session=session, - llm=llm, - model=profile.model, - session_id=session_id, - reason="midturn", - keep_recent_messages=max(12, settings.context_keep_recent * 2), - ) - if event: - yield event + async for _ev in self._compression_events_midturn(session, llm, profile, session_id, iteration, ctx_injections, mem): + yield _ev - accumulated_text = "" - accumulated_thinking = "" - turn_tool_calls: list[ToolCallRequest] | None = None - thinking_active = False - context_tokens: int | None = None + state = StreamState() built_ctx = self._ctx_builder.build(session.context, profile, mem, iteration=iteration, max_iterations=profile.max_iterations, @@ -442,59 +302,41 @@ yield StreamEnd(content=error_text) return - async for chunk in _iter_stream_guarded( - llm.stream_complete( - built_ctx, - tools=tool_schemas if tools else None, - temperature=profile.temperature, - model=profile.model, - think=profile.think_enabled, - top_k=profile.top_k, - top_p=profile.top_p, - num_thread=profile.num_thread, + async for _ev in self._consume_stream( + _iter_stream_guarded( + llm.stream_complete( + built_ctx, + tools=tool_schemas if tools else None, + temperature=profile.temperature, + model=profile.model, + think=profile.think_enabled, + top_k=profile.top_k, + top_p=profile.top_p, + num_thread=profile.num_thread, + ), + stop_event=stop_event, + first_chunk_timeout=settings.llm_stream_first_chunk_timeout, + chunk_timeout=settings.llm_stream_chunk_timeout, ), - stop_event=stop_event, - first_chunk_timeout=settings.llm_stream_first_chunk_timeout, - chunk_timeout=settings.llm_stream_chunk_timeout, + stop_event, + turn_ctx, + state, ): - # Cooperative stop: break cleanly so aclose() is called on the - # generator → Ollama stream closes gracefully, model stays in VRAM. - if stop_event and stop_event.is_set(): - if thinking_active: - yield ThinkingEnd() - break - if chunk.prompt_tokens is not None or chunk.completion_tokens is not None: - _iter_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0) - turn_ctx.turn_tokens += _iter_tokens - context_tokens = _iter_tokens - if chunk.thinking: - accumulated_thinking += chunk.thinking - if not thinking_active: - thinking_active = True - yield ThinkingDelta(delta=chunk.thinking) - elif chunk.delta: - if thinking_active: - thinking_active = False - yield ThinkingEnd() - accumulated_text += chunk.delta - yield TextDelta(delta=chunk.delta) - if chunk.tool_calls: - turn_tool_calls = chunk.tool_calls - if chunk.finish_reason and thinking_active: - thinking_active = False - yield ThinkingEnd() + yield _ev # Stopped mid-stream — save partial response and exit if stop_event and stop_event.is_set(): - if accumulated_text: + if state.accumulated_text: session.messages.append(Message( - role="assistant", content=accumulated_text, + role="assistant", content=state.accumulated_text, created_at=datetime.now(timezone.utc), )) await self._sessions.save(session) yield StreamStopped() return + turn_tool_calls = state.turn_tool_calls + if not turn_tool_calls: # Final response — text already streamed above _elapsed = round(time.monotonic() - turn_ctx.turn_start, 1) @@ -502,8 +344,8 @@ _net_tokens = turn_ctx.turn_tokens + turn_ctx.subagent_tokens assistant_msg = Message( role="assistant", - content=accumulated_text or None, - thinking=accumulated_thinking or None, + content=state.accumulated_text or None, + thinking=state.accumulated_thinking or None, created_at=datetime.now(timezone.utc), elapsed_seconds=_elapsed, tool_call_count=turn_ctx.tool_call_count if turn_ctx.tool_call_count else None, @@ -511,101 +353,34 @@ ) session.messages.append(assistant_msg) session.context.append(assistant_msg) - session.context_token_count = context_tokens or 0 + session.context_token_count = state.context_tokens or 0 await self._sessions.save(session) yield StreamEnd( - full_content=accumulated_text, - context_tokens=context_tokens, + full_content=state.accumulated_text, + context_tokens=state.context_tokens, max_context_tokens=settings.ollama_num_ctx, elapsed_seconds=_elapsed, tool_call_count=turn_ctx.tool_call_count, token_count=_net_tokens if _net_tokens else None, ) - for event in await self._run_workers(session, llm, profile.model, context_tokens): + for event in await self._run_workers(session, llm, profile.model, state.context_tokens): yield event return # Tool calls turn — record to session and execute assistant_msg = Message( role="assistant", - content=accumulated_text or None, - thinking=accumulated_thinking or None, + content=state.accumulated_text or None, + thinking=state.accumulated_thinking or None, tool_calls=turn_tool_calls, ) session.messages.append(assistant_msg) session.context.append(assistant_msg) - tool_map = {t.name: t for t in tools} - for tc in turn_tool_calls: - # 1. Announce immediately so the UI shows a pending card - yield ToolStarted(tool_name=tc.name, arguments=tc.arguments) - - # 2. Create a sink queue for sub-agent events from this tool call. - # create_task() snapshots the current ContextVar values, so the - # task will inherit current_event_sink = sink. - sink: asyncio.Queue = asyncio.Queue() - sink_token = current_event_sink.set(sink) - result_holder: list = [] - - async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink): - try: - _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map)) - except Exception as exc: - _holder.append(exc) - finally: - await _sink.put(_TOOL_DONE) - - tool_task = asyncio.create_task(_run_with_sentinel()) - current_event_sink.reset(sink_token) # outer ctx restored; task has its own copy - - try: - # 3. Block on the sink until the sentinel arrives. - # Sub-agent ToolStarted/ToolEvent objects come through here in real time. - while True: - item = await sink.get() - if item is _TOOL_DONE: - break - if isinstance(item, SubagentComplete): - turn_ctx.subagent_tokens += item.token_count - turn_ctx.tool_call_count += item.tool_call_count - elif isinstance(item, AIHelperTokensUsed): - turn_ctx.subagent_tokens += item.total - else: - yield item - - # 4. Unpack result or handle exception - r = result_holder[0] if result_holder else RuntimeError("tool task produced no result") - if isinstance(r, Exception): - # Infrastructure errors (LLMBackendError / LLMConnectionError) abort the turn; - # everything else becomes a failed tool result so the loop can continue. - if isinstance(r, (LLMBackendError, LLMConnectionError)): - raise r - log.warning("agent.tool_exception", tool=tc.name, error=str(r)) - tool_event = ToolEvent( - tool_name=tc.name, arguments=tc.arguments, - result=f"Error: {r}", success=False, - ) - msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={}) - image_msg = None - else: - tool_event, msg, image_msg = r - - # 5. Yield the completed ToolEvent and record in session - turn_ctx.tool_call_count += 1 - yield tool_event - session.messages.append(msg) - session.context.append(msg) - if image_msg: - session.context.append(image_msg) - finally: - if not tool_task.done(): - tool_task.cancel() - try: - await tool_task - except Exception: - pass + async for _ev in self._execute_tools_with_sink(turn_tool_calls, tools, turn_ctx, session): + yield _ev # 6. Cooperative stop: check after tool execution before next LLM call if stop_event and stop_event.is_set(): @@ -727,3 +502,145 @@ def _get_backend(self, backend_key: str) -> LLMBackend: return self._backends.get(backend_key) + async def _compression_events_preturn(self, session, llm, profile, session_id): + if ( + settings.context_compression_enabled + and len(session.context) > 2 + and should_compress( + self._compressor.estimate_context_tokens(session.context), + settings.ollama_num_ctx, + settings.context_compression_threshold, + ) + ): + yield CompressionStarted( + context_tokens=self._compressor.estimate_context_tokens(session.context), + max_context_tokens=settings.ollama_num_ctx, + ) + event = await self._do_compress_and_save( + session=session, + llm=llm, + model=profile.model, + session_id=session_id, + reason="preturn", + ) + if event: + yield event + + async def _compression_events_midturn(self, session, llm, profile, session_id, iteration, ctx_injections, mem): + if settings.context_compression_enabled and iteration > 0: + preflight_ctx = self._ctx_builder.build( + session.context, + profile, + mem, + extra_system=ctx_injections, + session_id=session_id, + ) + estimated_tokens = self._compressor.estimate_context_tokens(preflight_ctx) + if should_compress( + estimated_tokens, + settings.ollama_num_ctx, + settings.context_compression_threshold, + ): + yield CompressionStarted( + context_tokens=estimated_tokens, + max_context_tokens=settings.ollama_num_ctx, + ) + event = await self._do_compress_and_save( + session=session, + llm=llm, + model=profile.model, + session_id=session_id, + reason="midturn", + keep_recent_messages=max(12, settings.context_keep_recent * 2), + ) + if event: + yield event + + async def _consume_stream(self, stream_gen, stop_event, turn_ctx: AgentTurnContext, state: StreamState): + async for chunk in stream_gen: + if stop_event and stop_event.is_set(): + if state.thinking_active: + yield ThinkingEnd() + break + if chunk.prompt_tokens is not None or chunk.completion_tokens is not None: + _iter_tokens = (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0) + turn_ctx.turn_tokens += _iter_tokens + state.context_tokens = _iter_tokens + if chunk.thinking: + state.accumulated_thinking += chunk.thinking + if not state.thinking_active: + state.thinking_active = True + yield ThinkingDelta(delta=chunk.thinking) + elif chunk.delta: + if state.thinking_active: + state.thinking_active = False + yield ThinkingEnd() + state.accumulated_text += chunk.delta + yield TextDelta(delta=chunk.delta) + if chunk.tool_calls: + state.turn_tool_calls = chunk.tool_calls + if chunk.finish_reason and state.thinking_active: + state.thinking_active = False + yield ThinkingEnd() + + async def _execute_tools_with_sink(self, turn_tool_calls, tools, turn_ctx: AgentTurnContext, session): + tool_map = {t.name: t for t in tools} + for tc in turn_tool_calls: + yield ToolStarted(tool_name=tc.name, arguments=tc.arguments) + + sink: asyncio.Queue = asyncio.Queue() + sink_token = current_event_sink.set(sink) + result_holder: list = [] + + async def _run_with_sentinel(_tc=tc, _holder=result_holder, _sink=sink): + try: + _holder.append(await self._tool_executor._run_single_tool(_tc, tool_map)) + except Exception as exc: + _holder.append(exc) + finally: + await _sink.put(_TOOL_DONE) + + tool_task = asyncio.create_task(_run_with_sentinel()) + current_event_sink.reset(sink_token) + + try: + while True: + item = await sink.get() + if item is _TOOL_DONE: + break + if isinstance(item, SubagentComplete): + turn_ctx.subagent_tokens += item.token_count + turn_ctx.tool_call_count += item.tool_call_count + elif isinstance(item, AIHelperTokensUsed): + turn_ctx.subagent_tokens += item.total + else: + yield item + + r = result_holder[0] if result_holder else RuntimeError("tool task produced no result") + if isinstance(r, Exception): + if isinstance(r, (LLMBackendError, LLMConnectionError)): + raise r + log.warning("agent.tool_exception", tool=tc.name, error=str(r)) + tool_event = ToolEvent( + tool_name=tc.name, arguments=tc.arguments, + result=f"Error: {r}", success=False, + ) + msg = Message(role="tool", content=f"Error: {r}", tool_call_id=tc.id, name=tc.name, metadata={}) + image_msg = None + else: + tool_event, msg, image_msg = r + + turn_ctx.tool_call_count += 1 + yield tool_event + session.messages.append(msg) + session.context.append(msg) + if image_msg: + session.context.append(image_msg) + finally: + if not tool_task.done(): + tool_task.cancel() + try: + await tool_task + except Exception: + pass + diff --git a/navi/core/agent_run_context.py b/navi/core/agent_run_context.py index ec956fd..77b1209 100644 --- a/navi/core/agent_run_context.py +++ b/navi/core/agent_run_context.py @@ -5,6 +5,8 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING +from navi.llm.base import ToolCallRequest + if TYPE_CHECKING: pass @@ -23,3 +25,14 @@ turn_tokens: int = 0 subagent_tokens: int = 0 injected_fact_ids: set[str] = field(default_factory=set) + + +@dataclass +class StreamState: + """Mutable accumulator for a single LLM stream within one iteration.""" + + accumulated_text: str = "" + accumulated_thinking: str = "" + turn_tool_calls: list[ToolCallRequest] | None = None + thinking_active: bool = False + context_tokens: int | None = None