diff --git a/navi/core/agent.py b/navi/core/agent.py index 86ec4bd..49e597a 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -19,7 +19,6 @@ import json import time from datetime import datetime, timezone -from pathlib import Path from typing import TYPE_CHECKING, AsyncGenerator import structlog @@ -34,6 +33,9 @@ from .compressor import ContextCompressor, should_compress from .context_builder import ContextBuilder from .planning import PlanningEngine +from .stream_guard import _iter_stream_guarded +from .subagent_runner import SubAgentRunner +from .tool_utils import build_tool_list, load_user_enabled_tools from .events import ( AgentEvent, AIHelperTokensUsed, @@ -61,100 +63,6 @@ from navi.memory.store import MemoryStore from navi.workers.base import Worker, WorkerContext -_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json" - - -async def _iter_stream_guarded( - stream_gen: "AsyncGenerator[LLMChunk, None]", - stop_event: "asyncio.Event | None", - first_chunk_timeout: float, - chunk_timeout: float, -) -> "AsyncGenerator[LLMChunk, None]": - """ - Wraps a streaming LLM generator with two safety mechanisms: - - 1. Stop-event responsiveness during prefill. - Normally, the agent only checks stop_event *between* chunks. During the - prefill phase (processing input tokens) Ollama emits no chunks at all — - the first await can block for minutes on large contexts. This wrapper polls - stop_event every second so the user's Stop button works even then. - - 2. Timeouts as a last-resort safety net. - first_chunk_timeout: how long to wait for the first token (prefill). - chunk_timeout: max gap between subsequent tokens. - On timeout the generator is closed, which terminates the HTTP connection - to Ollama → Ollama halts generation → GPU load drops to idle. - """ - first = True - chunk_task: asyncio.Task | None = None - try: - while True: - timeout = first_chunk_timeout if first else chunk_timeout - # Create one task per chunk; reuse across poll iterations so we - # don't accidentally start multiple concurrent __anext__ calls. - chunk_task = asyncio.ensure_future(stream_gen.__anext__()) - elapsed = 0.0 - - while True: - done, _ = await asyncio.wait({chunk_task}, timeout=1.0) - if done: - break - elapsed += 1.0 - if stop_event and stop_event.is_set(): - chunk_task.cancel() - try: - await chunk_task - except (asyncio.CancelledError, Exception): - pass - chunk_task = None - return - if elapsed >= timeout: - chunk_task.cancel() - try: - await chunk_task - except (asyncio.CancelledError, Exception): - pass - chunk_task = None - label = "first token (context may be too large for this model)" if first else "next token" - raise LLMBackendError( - f"LLM timed out after {elapsed:.0f}s waiting for {label}." - ) - - try: - chunk = chunk_task.result() - except StopAsyncIteration: - chunk_task = None - return - - chunk_task = None - first = False - yield chunk - - if stop_event and stop_event.is_set(): - return - - finally: - # Cancel any in-flight __anext__ task so we don't leave a zombie - # coroutine holding an open HTTP connection to Ollama. - if chunk_task is not None and not chunk_task.done(): - chunk_task.cancel() - try: - await chunk_task - except (asyncio.CancelledError, Exception): - pass - # Closing the generator terminates the HTTP connection to Ollama, - # which signals it to stop generating (GPU returns to idle). - try: - await stream_gen.aclose() - except Exception: - pass - - -def _load_user_enabled_tools() -> list[str]: - try: - return json.loads(_USER_ENABLED_FILE.read_text()) - except Exception: - return [] log = structlog.get_logger() @@ -162,12 +70,6 @@ # Sentinel: placed in the event sink by the tool wrapper to signal completion. _TOOL_DONE = object() -# Sub-agents are execution workers. If a sub-agent produces only thinking for a -# long time without text or tool calls, local models can degenerate into endless -# internal-token loops and keep the GPU busy with no user-visible progress. -_SUBAGENT_THINKING_STALL_SECONDS = 60.0 -_SUBAGENT_THINKING_STALL_CHARS = 12_000 - async def _todo_progress_message(session_id: str, *, first_iteration: bool = False) -> "Message | None": """Build a compact system reminder with current todo state and update discipline.""" @@ -204,6 +106,17 @@ self._tool_executor = ToolExecutor(tool_registry) self._planning = PlanningEngine(self._ctx_builder) self._compressor = ContextCompressor() + self._subagent = SubAgentRunner( + profile_registry=profile_registry, + tool_registry=tool_registry, + backend_registry=backend_registry, + ctx_builder=self._ctx_builder, + compressor=self._compressor, + planning=self._planning, + tool_executor=self._tool_executor, + session_store=session_store, + mcp_manager=mcp_manager, + ) # ------------------------------------------------------------------ # Public interface @@ -319,312 +232,19 @@ parent_session_id: str | None = None, timeout_seconds: float = 300.0, ) -> tuple[str, bool]: - """ - Run a sub-agent loop without a persistent session. - - Returns (result_text, completed_normally). - completed_normally is False if the sub-agent hit the iteration limit or timed out. - - Intended for spawning from tools (e.g. SpawnAgentTool). - No DB reads/writes — uses a temporary in-memory context. - Tools listed in exclude_tools are stripped from the tool list - (use this to prevent recursion: exclude 'spawn_agent'). - - System prompt structure: - Default (inherit_system_prompt=False): only subagent-specific prompts: - 1. profile.subagent_system_prompt — focused executor persona - 2. custom_system_prompt — optional role specialisation for this task - 3. briefing — task context (credentials, paths, instructions) - Inherit mode (inherit_system_prompt=True): parent's system prompt as base layer, - then subagent specialisation on top. - - context_transfer: text from the parent's scratchpad context_transfer section, - injected as a priming exchange before the task message. - parent_session_id: parent chat session id. When provided, sub-agent - tool calls run in that session context so session-aware tools resolve - filenames against the user's session directory, not an ephemeral id. - timeout_seconds: wall-clock timeout for the entire sub-agent run. - """ - import time as _time - import uuid as _uuid - from navi.tools._internal.base import ( - current_session_id as _sid_var, - current_model as _model_var, - current_user_id as _uid_var, - current_user_role as _role_var, - current_user_info as _uinfo_var, + """Delegate to SubAgentRunner.""" + return await self._subagent.run( + user_message=user_message, + profile_id=profile_id, + max_iterations=max_iterations, + exclude_tools=exclude_tools, + briefing=briefing, + custom_system_prompt=custom_system_prompt, + inherit_system_prompt=inherit_system_prompt, + context_transfer=context_transfer, + parent_session_id=parent_session_id, + timeout_seconds=timeout_seconds, ) - _prev_sid = _sid_var.get(None) - _prev_model = _model_var.get(None) - _prev_uid = _uid_var.get(None) - _prev_role = _role_var.get() - _prev_uinfo = _uinfo_var.get(None) - subagent_run_id = f"subagent_{_uuid.uuid4().hex[:12]}" - tool_session_id = parent_session_id or subagent_run_id - _sid_var.set(tool_session_id) - - profile = self._profiles.get(profile_id) - _model_var.set(profile.model) - exclude = set(exclude_tools or []) - - # Use dedicated subagent_tools if configured, else fall back to enabled_tools. - tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools - tools = [t for t in self._tool_list(tool_source, profile.mcp_servers) if t.name not in exclude] - tool_schemas = [t.schema() for t in tools] - llm = self._get_backend(profile.llm_backend) - - user_id = None - if parent_session_id and self._sessions: - parent_session = await self._sessions.get(parent_session_id) - if parent_session: - user_id = parent_session.user_id - if user_id is not None: - _uid_var.set(user_id) - _role_var.set(_prev_role or "user") - _uinfo_var.set(_prev_uinfo) - else: - _uid_var.set(None) - _role_var.set("user") - _uinfo_var.set(None) - mem = await self._ctx_builder._memory_msg(user_id=user_id) - - # Build subagent system prompt. - # Default (inherit=False): completely separate from the parent's system prompt. - # Structure: subagent persona → role specialisation → task context - # Inherit mode (inherit=True): parent's system prompt as base layer, - # then subagent specialisation on top. - sys_parts: list[str] = [] - if inherit_system_prompt: - sys_parts.append(profile.system_prompt) - if profile.subagent_system_prompt: - sys_parts.append(profile.subagent_system_prompt) - if custom_system_prompt: - sys_parts.append(custom_system_prompt) - if briefing: - sys_parts.append(f"## Task context\n\n{briefing}") - if parent_session_id: - sys_parts.append( - "[Parent session context]\n" - f"Parent Session ID: {parent_session_id}\n" - f"Session files directory: {settings.session_files_dir}/{parent_session_id}/\n" - "For files the user should see, write to this exact session directory. " - "Do not use or invent a subagent_* directory." - ) - if not sys_parts: - # Fallback if profile has no subagent_system_prompt defined - sys_parts.append(profile.system_prompt) - subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts)) - - # Build initial context. - # If context_transfer is provided, inject it as a priming exchange so the - # sub-agent has the parent's working state from the start. - context: list[Message] = [] - if context_transfer: - context.append(Message( - role="user", - content=f"## Context from parent agent\n\n{context_transfer}", - )) - context.append(Message( - role="assistant", - content="Understood. I have the context. Ready to begin the task.", - )) - context.append(Message(role="user", content=user_message, created_at=datetime.now(timezone.utc))) - - # Read the event sink set by the parent run_stream() for this tool call. - # If None (e.g. called from run(), not run_stream()), events are silently dropped. - sink = current_event_sink.get() - - log.info("agent.subagent.start", profile_id=profile_id, max_iterations=max_iterations, - tools=len(tools), planning=profile.subagent_planning_enabled) - - stop_event = current_stop_event.get() - tool_map = {t.name: t for t in tools} - subagent_think = ( - profile.subagent_think_enabled - if profile.subagent_think_enabled is not None - else profile.think_enabled - ) - - _turn_tokens: int = 0 # accumulated tokens across all iterations and planning - _sub_tool_count: int = 0 # total tool calls across all iterations - _start_time = _time.monotonic() - accumulated_text = "" - - try: - # ── Optional planning phase ──────────────────────────────────────────── - if profile.subagent_planning_enabled: - async for _ev in self._planning.run( - context, profile, llm, mem, tool_schemas, - system_prompt_override=subagent_sys_msg.content, - is_subagent=True, - ): - if isinstance(_ev, AIHelperTokensUsed): - _turn_tokens += _ev.total - elif sink is not None: - await sink.put(_ev) - - # ── Tool-calling loop ────────────────────────────────────────────────── - for iteration in range(max_iterations): - if stop_event and stop_event.is_set(): - return accumulated_text, False - - elapsed = _time.monotonic() - _start_time - if elapsed >= timeout_seconds: - log.warning("agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds) - if sink is not None: - await sink.put(SubagentComplete(token_count=_turn_tokens, tool_call_count=_sub_tool_count)) - return accumulated_text or "[Sub-agent timed out]", False - - log.debug("agent.subagent.iteration", iteration=iteration) - - accumulated_text = "" - accumulated_thinking = "" - turn_tool_calls: list[ToolCallRequest] | None = None - thinking_started_at: float | None = None - thinking_stalled_reason: str | None = None - - # Build context inline — no persona or profiles block for subagents. - built_ctx: list[Message] = [subagent_sys_msg] - if mem: - built_ctx.append(mem) - mcp_msg = self._ctx_builder._mcp_context_msg(profile) - if mcp_msg: - built_ctx.append(mcp_msg) - built_ctx.extend(m for m in context if m.role != "system") - self._check_context_size(built_ctx) - - async for chunk in _iter_stream_guarded( - llm.stream_complete( - built_ctx, - tools=tool_schemas if tools else None, - temperature=profile.temperature, - model=profile.model, - think=subagent_think, - top_k=profile.top_k, - top_p=profile.top_p, - num_thread=profile.num_thread, - ), - stop_event=stop_event, - first_chunk_timeout=settings.llm_stream_first_chunk_timeout, - chunk_timeout=settings.llm_stream_chunk_timeout, - ): - if chunk.prompt_tokens is not None or chunk.completion_tokens is not None: - _turn_tokens += (chunk.prompt_tokens or 0) + (chunk.completion_tokens or 0) - if chunk.thinking: - if thinking_started_at is None: - thinking_started_at = _time.monotonic() - accumulated_thinking += chunk.thinking - thinking_elapsed = _time.monotonic() - thinking_started_at - if ( - thinking_elapsed >= _SUBAGENT_THINKING_STALL_SECONDS - or len(accumulated_thinking) >= _SUBAGENT_THINKING_STALL_CHARS - ): - thinking_stalled_reason = ( - "Sub-agent produced only thinking output for " - f"{thinking_elapsed:.0f}s / {len(accumulated_thinking)} chars " - "without text or tool calls." - ) - log.warning( - "agent.subagent.thinking_stall", - elapsed=thinking_elapsed, - chars=len(accumulated_thinking), - profile_id=profile_id, - ) - break - if chunk.delta: - accumulated_text += chunk.delta - if chunk.tool_calls: - turn_tool_calls = chunk.tool_calls - - if stop_event and stop_event.is_set(): - return accumulated_text, False - - if thinking_stalled_reason: - if sink is not None: - await sink.put(SubagentComplete( - token_count=_turn_tokens, - tool_call_count=_sub_tool_count, - )) - return f"[{thinking_stalled_reason}]", False - - if not turn_tool_calls: - log.info("agent.subagent.complete", iterations=iteration + 1, - result_len=len(accumulated_text)) - if sink is not None: - await sink.put(SubagentComplete( - token_count=_turn_tokens, - tool_call_count=_sub_tool_count, - )) - return accumulated_text, True - - # Emit accumulated thinking before tool calls - if accumulated_thinking and sink is not None: - log.debug("agent.subagent.turn_thinking", length=len(accumulated_thinking)) - await sink.put(TurnThinking(thinking=accumulated_thinking, is_subagent=True)) - - context.append(Message( - role="assistant", - content=accumulated_text or None, - tool_calls=turn_tool_calls, - )) - - # Execute each tool call sequentially, emitting events to parent sink - for tc in turn_tool_calls: - _sub_tool_count += 1 - if sink is not None: - await sink.put(ToolStarted( - tool_name=tc.name, arguments=tc.arguments, is_subagent=True - )) - - tool = tool_map.get(tc.name) - image_msg = None - metadata: dict = {} - if tool is None: - content = f"Error: tool '{tc.name}' not found." - success = False - else: - log.info("tool.execute.subagent", tool=tc.name, args=tc.arguments) - try: - result = await tool.execute(tc.arguments) - content = result.to_message_content() - success = result.success - metadata = result.metadata or {} - if result.success and result.metadata and result.metadata.get("is_image"): - b64 = result.metadata.get("base64") - if b64: - image_msg = Message( - role="user", - content=f"[Image loaded via {tc.name} — analyse it]", - images=[b64], - ) - except Exception as exc: - log.warning("agent.subagent.tool_exception", tool=tc.name, error=str(exc)) - content = f"Error: {exc}" - success = False - metadata = {} - - if sink is not None: - await sink.put(ToolEvent( - tool_name=tc.name, arguments=tc.arguments, - result=content, success=success, is_subagent=True, - )) - - context.append(Message(role="tool", content=content, - tool_call_id=tc.id, name=tc.name, metadata=metadata)) - if image_msg: - context.append(image_msg) - - log.warning("agent.subagent.max_iterations", max_iterations=max_iterations) - if sink is not None: - await sink.put(SubagentComplete(token_count=_turn_tokens, tool_call_count=_sub_tool_count)) - return accumulated_text or "[Sub-agent reached iteration limit without a final answer]", False - finally: - # Restore parent ContextVar values so background tasks don't inherit stale subagent IDs - _sid_var.set(_prev_sid) - _model_var.set(_prev_model) - _uid_var.set(_prev_uid) - _role_var.set(_prev_role) - _uinfo_var.set(_prev_uinfo) async def run_stream( self, @@ -809,7 +429,7 @@ built_ctx.append(_stall_msg) try: - self._check_context_size(built_ctx) + self._compressor.check_context_size(built_ctx) except ContextTooLargeError as e: # Surface the error as a Navi response (not a raw system error) so the # user sees a coherent message and the exchange is saved to history. @@ -1102,67 +722,8 @@ enabled: list[str], mcp_servers: dict[str, list[str]] | None = None, ) -> list[Tool]: - names = list(enabled) - extra = _load_user_enabled_tools() - for name in extra: - if name not in names: - names.append(name) - - # Expand MCP server groups into concrete tool names - if mcp_servers and self._mcp_manager: - for server_name, groups in mcp_servers.items(): - if "*" in groups: - # All registered tools for this server - prefix = f"mcp:{server_name}:" - for tool in self._tools.all(): - if tool.name.startswith(prefix) and tool.name not in names: - names.append(tool.name) - else: - for group_name in groups: - for tool_name in self._mcp_manager.resolve_group(server_name, group_name): - full_name = f"mcp:{server_name}:{tool_name}" - if full_name not in names: - names.append(full_name) - - result = [] - for name in names: - try: - result.append(self._tools.get(name)) - except Exception: - pass - return result + return build_tool_list(enabled, mcp_servers, self._tools, self._mcp_manager) def _get_backend(self, backend_key: str) -> LLMBackend: return self._backends.get(backend_key) - def _check_context_size(self, context: list[Message]) -> None: - """Raise ContextTooLargeError before an LLM call if the context is dangerously large. - - Uses a conservative character-based estimate (~4 chars per token for text). - Images are counted at 500 tokens each (rough vision-model estimate). - - Checks against the *remaining* budget, not a fixed percentage of the window: - available_for_input = ollama_num_ctx - output_reserve - where output_reserve is a fixed token headroom reserved for the model's response. - This correctly accounts for sessions where conversation history already consumes - a large portion of the window. - """ - if not context: - return - - output_reserve = settings.output_reserve_tokens - - total = self._compressor.estimate_context_tokens(context) - available = settings.ollama_num_ctx - output_reserve - - if total > available: - existing = self._compressor.estimate_context_tokens(context[:-1]) - new = self._compressor.estimate_context_tokens(context[-1:]) - remaining = available - existing - raise ContextTooLargeError( - f"Context too large: new content is ~{new:,} estimated tokens, " - f"but only ~{max(0, remaining):,} tokens are available " - f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, " - f"output_reserve {output_reserve:,}). " - "Split the file into smaller parts or delegate to a subagent." - ) diff --git a/navi/core/compressor.py b/navi/core/compressor.py index f50766b..3cd2869 100644 --- a/navi/core/compressor.py +++ b/navi/core/compressor.py @@ -340,3 +340,38 @@ "the model from exceeding its context window. Some earlier details may have been lost.]" ) return new_context, summary_text + + def check_context_size(self, context: list[Message]) -> None: + """Raise ContextTooLargeError before an LLM call if the context is dangerously large. + + Uses a conservative character-based estimate (~3 chars per token for text). + Images are counted at 500 tokens each (rough vision-model estimate). + + Checks against the *remaining* budget, not a fixed percentage of the window: + available_for_input = ollama_num_ctx - output_reserve + where output_reserve is a fixed token headroom reserved for the model's response. + This correctly accounts for sessions where conversation history already consumes + a large portion of the window. + """ + from navi.config import settings + from navi.exceptions import ContextTooLargeError + + if not context: + return + + output_reserve = settings.output_reserve_tokens + + total = self.estimate_context_tokens(context) + available = settings.ollama_num_ctx - output_reserve + + if total > available: + existing = self.estimate_context_tokens(context[:-1]) + new = self.estimate_context_tokens(context[-1:]) + remaining = available - existing + raise ContextTooLargeError( + f"Context too large: new content is ~{new:,} estimated tokens, " + f"but only ~{max(0, remaining):,} tokens are available " + f"(window {settings.ollama_num_ctx:,}, already used ~{existing:,}, " + f"output_reserve {output_reserve:,}). " + "Split the file into smaller parts or delegate to a subagent." + ) diff --git a/navi/core/stream_guard.py b/navi/core/stream_guard.py new file mode 100644 index 0000000..cd7b92f --- /dev/null +++ b/navi/core/stream_guard.py @@ -0,0 +1,95 @@ +"""Stream safety wrapper for LLM generators.""" + +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +from navi.exceptions import LLMBackendError +from navi.llm.base import LLMChunk + + +async def _iter_stream_guarded( + stream_gen: "AsyncGenerator[LLMChunk, None]", + stop_event: "asyncio.Event | None", + first_chunk_timeout: float, + chunk_timeout: float, +) -> "AsyncGenerator[LLMChunk, None]": + """ + Wraps a streaming LLM generator with two safety mechanisms: + + 1. Stop-event responsiveness during prefill. + Normally, the agent only checks stop_event *between* chunks. During the + prefill phase (processing input tokens) Ollama emits no chunks at all — + the first await can block for minutes on large contexts. This wrapper polls + stop_event every second so the user's Stop button works even then. + + 2. Timeouts as a last-resort safety net. + first_chunk_timeout: how long to wait for the first token (prefill). + chunk_timeout: max gap between subsequent tokens. + On timeout the generator is closed, which terminates the HTTP connection + to Ollama → Ollama halts generation → GPU load drops to idle. + """ + first = True + chunk_task: asyncio.Task | None = None + try: + while True: + timeout = first_chunk_timeout if first else chunk_timeout + # Create one task per chunk; reuse across poll iterations so we + # don't accidentally start multiple concurrent __anext__ calls. + chunk_task = asyncio.ensure_future(stream_gen.__anext__()) + elapsed = 0.0 + + while True: + done, _ = await asyncio.wait({chunk_task}, timeout=1.0) + if done: + break + elapsed += 1.0 + if stop_event and stop_event.is_set(): + chunk_task.cancel() + try: + await chunk_task + except (asyncio.CancelledError, Exception): + pass + chunk_task = None + return + if elapsed >= timeout: + chunk_task.cancel() + try: + await chunk_task + except (asyncio.CancelledError, Exception): + pass + chunk_task = None + label = "first token (context may be too large for this model)" if first else "next token" + raise LLMBackendError( + f"LLM timed out after {elapsed:.0f}s waiting for {label}." + ) + + try: + chunk = chunk_task.result() + except StopAsyncIteration: + chunk_task = None + return + + chunk_task = None + first = False + yield chunk + + if stop_event and stop_event.is_set(): + return + + finally: + # Cancel any in-flight __anext__ task so we don't leave a zombie + # coroutine holding an open HTTP connection to Ollama. + if chunk_task is not None and not chunk_task.done(): + chunk_task.cancel() + try: + await chunk_task + except (asyncio.CancelledError, Exception): + pass + # Closing the generator terminates the HTTP connection to Ollama, + # which signals it to stop generating (GPU returns to idle). + try: + await stream_gen.aclose() + except Exception: + pass diff --git a/navi/core/subagent_runner.py b/navi/core/subagent_runner.py new file mode 100644 index 0000000..4302e65 --- /dev/null +++ b/navi/core/subagent_runner.py @@ -0,0 +1,424 @@ +"""Sub-agent execution — standalone loop without a persistent session.""" + +from __future__ import annotations + +import time +import uuid +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +import structlog + +from navi.config import settings +from navi.exceptions import ContextTooLargeError +from navi.llm.base import LLMBackend, Message, ToolCallRequest +from navi.tools._internal.base import current_event_sink, current_stop_event + +from .events import AIHelperTokensUsed, SubagentComplete, ToolEvent, ToolStarted, TurnThinking +from .stream_guard import _iter_stream_guarded +from .tool_utils import build_tool_list + +if TYPE_CHECKING: + from .compressor import ContextCompressor + from .context_builder import ContextBuilder + from .planning import PlanningEngine + from .registry import BackendRegistry, ProfileRegistry, ToolRegistry + from .session import SessionStore + from .tool_executor import ToolExecutor + +log = structlog.get_logger() + +# Sub-agents are execution workers. If a sub-agent produces only thinking for a +# long time without text or tool calls, local models can degenerate into endless +# internal-token loops and keep the GPU busy with no user-visible progress. +_SUBAGENT_THINKING_STALL_SECONDS = 60.0 +_SUBAGENT_THINKING_STALL_CHARS = 12_000 + + +class SubAgentRunner: + """Runs a tool-calling sub-agent loop with timeout and thinking-stall guards.""" + + def __init__( + self, + profile_registry: ProfileRegistry, + tool_registry: ToolRegistry, + backend_registry: BackendRegistry, + ctx_builder: ContextBuilder, + compressor: ContextCompressor, + planning: PlanningEngine, + tool_executor: ToolExecutor, + session_store: SessionStore | None, + mcp_manager=None, + ) -> None: + self._profiles = profile_registry + self._tools = tool_registry + self._backends = backend_registry + self._ctx_builder = ctx_builder + self._compressor = compressor + self._planning = planning + self._tool_executor = tool_executor + self._sessions = session_store + self._mcp_manager = mcp_manager + + async def run( + self, + user_message: str, + profile_id: str, + max_iterations: int = 40, + exclude_tools: list[str] | None = None, + briefing: str | None = None, + custom_system_prompt: str | None = None, + inherit_system_prompt: bool = False, + context_transfer: str | None = None, + parent_session_id: str | None = None, + timeout_seconds: float = 300.0, + ) -> tuple[str, bool]: + """ + Run a sub-agent loop without a persistent session. + + Returns (result_text, completed_normally). + completed_normally is False if the sub-agent hit the iteration limit or timed out. + """ + from navi.tools._internal.base import ( + current_session_id as _sid_var, + current_model as _model_var, + current_user_id as _uid_var, + current_user_role as _role_var, + current_user_info as _uinfo_var, + ) + + _prev_sid = _sid_var.get(None) + _prev_model = _model_var.get(None) + _prev_uid = _uid_var.get(None) + _prev_role = _role_var.get() + _prev_uinfo = _uinfo_var.get(None) + subagent_run_id = f"subagent_{uuid.uuid4().hex[:12]}" + tool_session_id = parent_session_id or subagent_run_id + _sid_var.set(tool_session_id) + + profile = self._profiles.get(profile_id) + _model_var.set(profile.model) + exclude = set(exclude_tools or []) + + tool_source = profile.subagent_tools if profile.subagent_tools else profile.enabled_tools + tools = [ + t + for t in build_tool_list(tool_source, profile.mcp_servers, self._tools, self._mcp_manager) + if t.name not in exclude + ] + tool_schemas = [t.schema() for t in tools] + llm = self._get_backend(profile.llm_backend) + + user_id = None + if parent_session_id and self._sessions: + parent_session = await self._sessions.get(parent_session_id) + if parent_session: + user_id = parent_session.user_id + if user_id is not None: + _uid_var.set(user_id) + _role_var.set(_prev_role or "user") + _uinfo_var.set(_prev_uinfo) + else: + _uid_var.set(None) + _role_var.set("user") + _uinfo_var.set(None) + mem = await self._ctx_builder._memory_msg(user_id=user_id) + + sys_parts: list[str] = [] + if inherit_system_prompt: + sys_parts.append(profile.system_prompt) + if profile.subagent_system_prompt: + sys_parts.append(profile.subagent_system_prompt) + if custom_system_prompt: + sys_parts.append(custom_system_prompt) + if briefing: + sys_parts.append(f"## Task context\n\n{briefing}") + if parent_session_id: + sys_parts.append( + "[Parent session context]\n" + f"Parent Session ID: {parent_session_id}\n" + f"Session files directory: {settings.session_files_dir}/{parent_session_id}/\n" + "For files the user should see, write to this exact session directory. " + "Do not use or invent a subagent_* directory." + ) + if not sys_parts: + sys_parts.append(profile.system_prompt) + subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts)) + + context: list[Message] = [] + if context_transfer: + context.append( + Message( + role="user", + content=f"## Context from parent agent\n\n{context_transfer}", + ) + ) + context.append( + Message( + role="assistant", + content="Understood. I have the context. Ready to begin the task.", + ) + ) + context.append( + Message(role="user", content=user_message, created_at=datetime.now(timezone.utc)) + ) + + sink = current_event_sink.get() + + log.info( + "agent.subagent.start", + profile_id=profile_id, + max_iterations=max_iterations, + tools=len(tools), + planning=profile.subagent_planning_enabled, + ) + + stop_event = current_stop_event.get() + tool_map = {t.name: t for t in tools} + subagent_think = ( + profile.subagent_think_enabled + if profile.subagent_think_enabled is not None + else profile.think_enabled + ) + + _turn_tokens: int = 0 + _sub_tool_count: int = 0 + _start_time = time.monotonic() + accumulated_text = "" + + try: + if profile.subagent_planning_enabled: + async for _ev in self._planning.run( + context, + profile, + llm, + mem, + tool_schemas, + system_prompt_override=subagent_sys_msg.content, + is_subagent=True, + ): + if isinstance(_ev, AIHelperTokensUsed): + _turn_tokens += _ev.total + elif sink is not None: + await sink.put(_ev) + + for iteration in range(max_iterations): + if stop_event and stop_event.is_set(): + return accumulated_text, False + + elapsed = time.monotonic() - _start_time + if elapsed >= timeout_seconds: + log.warning( + "agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds + ) + if sink is not None: + await sink.put( + SubagentComplete( + token_count=_turn_tokens, tool_call_count=_sub_tool_count + ) + ) + return accumulated_text or "[Sub-agent timed out]", False + + log.debug("agent.subagent.iteration", iteration=iteration) + + accumulated_text = "" + accumulated_thinking = "" + turn_tool_calls: list[ToolCallRequest] | None = None + thinking_started_at: float | None = None + thinking_stalled_reason: str | None = None + + built_ctx: list[Message] = [subagent_sys_msg] + if mem: + built_ctx.append(mem) + mcp_msg = self._ctx_builder._mcp_context_msg(profile) + if mcp_msg: + built_ctx.append(mcp_msg) + built_ctx.extend(m for m in context if m.role != "system") + self._compressor.check_context_size(built_ctx) + + async for chunk in _iter_stream_guarded( + llm.stream_complete( + built_ctx, + tools=tool_schemas if tools else None, + temperature=profile.temperature, + model=profile.model, + think=subagent_think, + top_k=profile.top_k, + top_p=profile.top_p, + num_thread=profile.num_thread, + ), + stop_event=stop_event, + first_chunk_timeout=settings.llm_stream_first_chunk_timeout, + chunk_timeout=settings.llm_stream_chunk_timeout, + ): + if ( + chunk.prompt_tokens is not None + or chunk.completion_tokens is not None + ): + _turn_tokens += (chunk.prompt_tokens or 0) + ( + chunk.completion_tokens or 0 + ) + if chunk.thinking: + if thinking_started_at is None: + thinking_started_at = time.monotonic() + accumulated_thinking += chunk.thinking + thinking_elapsed = time.monotonic() - thinking_started_at + if ( + thinking_elapsed >= _SUBAGENT_THINKING_STALL_SECONDS + or len(accumulated_thinking) >= _SUBAGENT_THINKING_STALL_CHARS + ): + thinking_stalled_reason = ( + "Sub-agent produced only thinking output for " + f"{thinking_elapsed:.0f}s / {len(accumulated_thinking)} chars " + "without text or tool calls." + ) + log.warning( + "agent.subagent.thinking_stall", + elapsed=thinking_elapsed, + chars=len(accumulated_thinking), + profile_id=profile_id, + ) + break + if chunk.delta: + accumulated_text += chunk.delta + if chunk.tool_calls: + turn_tool_calls = chunk.tool_calls + + if stop_event and stop_event.is_set(): + return accumulated_text, False + + if thinking_stalled_reason: + if sink is not None: + await sink.put( + SubagentComplete( + token_count=_turn_tokens, + tool_call_count=_sub_tool_count, + ) + ) + return f"[{thinking_stalled_reason}]", False + + if not turn_tool_calls: + log.info( + "agent.subagent.complete", + iterations=iteration + 1, + result_len=len(accumulated_text), + ) + if sink is not None: + await sink.put( + SubagentComplete( + token_count=_turn_tokens, + tool_call_count=_sub_tool_count, + ) + ) + return accumulated_text, True + + if accumulated_thinking and sink is not None: + log.debug( + "agent.subagent.turn_thinking", length=len(accumulated_thinking) + ) + await sink.put( + TurnThinking(thinking=accumulated_thinking, is_subagent=True) + ) + + context.append( + Message( + role="assistant", + content=accumulated_text or None, + tool_calls=turn_tool_calls, + ) + ) + + for tc in turn_tool_calls: + _sub_tool_count += 1 + if sink is not None: + await sink.put( + ToolStarted( + tool_name=tc.name, + arguments=tc.arguments, + is_subagent=True, + ) + ) + + tool = tool_map.get(tc.name) + image_msg = None + metadata: dict = {} + if tool is None: + content = f"Error: tool '{tc.name}' not found." + success = False + else: + log.info( + "tool.execute.subagent", tool=tc.name, args=tc.arguments + ) + try: + result = await tool.execute(tc.arguments) + content = result.to_message_content() + success = result.success + metadata = result.metadata or {} + if ( + result.success + and result.metadata + and result.metadata.get("is_image") + ): + b64 = result.metadata.get("base64") + if b64: + image_msg = Message( + role="user", + content=f"[Image loaded via {tc.name} — analyse it]", + images=[b64], + ) + except Exception as exc: + log.warning( + "agent.subagent.tool_exception", + tool=tc.name, + error=str(exc), + ) + content = f"Error: {exc}" + success = False + metadata = {} + + if sink is not None: + await sink.put( + ToolEvent( + tool_name=tc.name, + arguments=tc.arguments, + result=content, + success=success, + is_subagent=True, + ) + ) + + context.append( + Message( + role="tool", + content=content, + tool_call_id=tc.id, + name=tc.name, + metadata=metadata, + ) + ) + if image_msg: + context.append(image_msg) + + log.warning( + "agent.subagent.max_iterations", max_iterations=max_iterations + ) + if sink is not None: + await sink.put( + SubagentComplete( + token_count=_turn_tokens, tool_call_count=_sub_tool_count + ) + ) + return ( + accumulated_text + or "[Sub-agent reached iteration limit without a final answer]", + False, + ) + finally: + _sid_var.set(_prev_sid) + _model_var.set(_prev_model) + _uid_var.set(_prev_uid) + _role_var.set(_prev_role) + _uinfo_var.set(_prev_uinfo) + + def _get_backend(self, backend_key: str) -> LLMBackend: + return self._backends.get(backend_key) diff --git a/navi/core/tool_utils.py b/navi/core/tool_utils.py new file mode 100644 index 0000000..bc557c8 --- /dev/null +++ b/navi/core/tool_utils.py @@ -0,0 +1,53 @@ +"""Tool list construction shared between Agent and SubAgentRunner.""" + +from pathlib import Path + +from navi.config import settings +from navi.tools._internal.base import Tool + +_USER_ENABLED_FILE = Path(settings.tools_dir) / "enabled.json" + + +def load_user_enabled_tools() -> list[str]: + try: + import json + return json.loads(_USER_ENABLED_FILE.read_text()) + except Exception: + return [] + + +def build_tool_list( + enabled: list[str], + mcp_servers: dict[str, list[str]] | None, + tool_registry, + mcp_manager, +) -> list[Tool]: + """Resolve enabled tool names + MCP groups into concrete Tool objects.""" + names = list(enabled) + extra = load_user_enabled_tools() + for name in extra: + if name not in names: + names.append(name) + + # Expand MCP server groups into concrete tool names + if mcp_servers and mcp_manager: + for server_name, groups in mcp_servers.items(): + if "*" in groups: + prefix = f"mcp:{server_name}:" + for tool in tool_registry.all(): + if tool.name.startswith(prefix) and tool.name not in names: + names.append(tool.name) + else: + for group_name in groups: + for tool_name in mcp_manager.resolve_group(server_name, group_name): + full_name = f"mcp:{server_name}:{tool_name}" + if full_name not in names: + names.append(full_name) + + result = [] + for name in names: + try: + result.append(tool_registry.get(name)) + except Exception: + pass + return result diff --git a/tests/unit/core/test_agent_context_size.py b/tests/unit/core/test_agent_context_size.py index 84082f6..c993c06 100644 --- a/tests/unit/core/test_agent_context_size.py +++ b/tests/unit/core/test_agent_context_size.py @@ -1,9 +1,9 @@ -"""Unit tests for Agent._check_context_size.""" +"""Unit tests for ContextCompressor.check_context_size.""" import pytest from navi.config import Settings -from navi.core.agent import Agent +from navi.core.compressor import ContextCompressor from navi.exceptions import ContextTooLargeError from navi.llm.base import Message @@ -12,11 +12,8 @@ @pytest.fixture(autouse=True) def _patch_settings(self, monkeypatch): """Use a small context window so tests don't need huge strings.""" - import navi.core.agent as _agent_mod - monkeypatch.setattr( - _agent_mod, - "settings", + "navi.config.settings", Settings( _env_file=None, ollama_num_ctx=128, @@ -26,26 +23,26 @@ ) def test_empty_context_ok(self): - agent = Agent(None, None, None, None) - agent._check_context_size([]) + compressor = ContextCompressor() + compressor.check_context_size([]) def test_small_context_ok(self): - agent = Agent(None, None, None, None) + compressor = ContextCompressor() msgs = [Message(role="user", content="hi")] - agent._check_context_size(msgs) + compressor.check_context_size(msgs) def test_exceeds_window_raises(self): - agent = Agent(None, None, None, None) + compressor = ContextCompressor() # 128 ctx - 8 reserve = 120 available; each char ~0.33 tokens # Need > 120 * 3 = 360 chars to exceed msgs = [Message(role="user", content="x" * 600)] with pytest.raises(ContextTooLargeError) as exc_info: - agent._check_context_size(msgs) + compressor.check_context_size(msgs) assert "Context too large" in str(exc_info.value) def test_images_count_toward_limit(self): - agent = Agent(None, None, None, None) + compressor = ContextCompressor() # 500 tokens per image; with 128 ctx we exceed immediately msgs = [Message(role="user", content="look", images=["b64"])] with pytest.raises(ContextTooLargeError): - agent._check_context_size(msgs) + compressor.check_context_size(msgs)