"""Sub-agent execution — standalone loop without a persistent session."""
from __future__ import annotations
import time
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
import structlog
from navi.config import settings
from navi.exceptions import ContextTooLargeError
from navi.llm.base import LLMBackend, Message, ToolCallRequest
from navi.tools._internal.base import ToolContext, current_event_sink, current_stop_event
from .events import AIHelperTokensUsed, SubagentComplete, ToolEvent, ToolStarted, TurnThinking
from .stream_guard import _iter_stream_guarded
from .tool_utils import build_tool_list
if TYPE_CHECKING:
from .compressor import ContextCompressor
from .context_builder import ContextBuilder
from .planning import PlanningEngine
from .registry import BackendRegistry, ProfileRegistry, ToolRegistry
from .session import SessionStore
from .tool_executor import ToolExecutor
log = structlog.get_logger()
# Sub-agents are execution workers. If a sub-agent produces only thinking for a
# long time without text or tool calls, local models can degenerate into endless
# internal-token loops and keep the GPU busy with no user-visible progress.
_SUBAGENT_THINKING_STALL_SECONDS = 60.0
_SUBAGENT_THINKING_STALL_CHARS = 12_000
class SubAgentRunner:
"""Runs a tool-calling sub-agent loop with timeout and thinking-stall guards."""
def __init__(
self,
profile_registry: ProfileRegistry,
tool_registry: ToolRegistry,
backend_registry: BackendRegistry,
ctx_builder: ContextBuilder,
compressor: ContextCompressor,
planning: PlanningEngine,
tool_executor: ToolExecutor,
session_store: SessionStore | None,
mcp_manager=None,
) -> None:
self._profiles = profile_registry
self._tools = tool_registry
self._backends = backend_registry
self._ctx_builder = ctx_builder
self._compressor = compressor
self._planning = planning
self._tool_executor = tool_executor
self._sessions = session_store
self._mcp_manager = mcp_manager
async def run(
self,
user_message: str,
profile_id: str,
max_iterations: int = 40,
exclude_tools: list[str] | None = None,
briefing: str | None = None,
custom_system_prompt: str | None = None,
inherit_system_prompt: bool = False,
context_transfer: str | None = None,
parent_session_id: str | None = None,
timeout_seconds: float = 300.0,
) -> tuple[str, bool]:
"""
Run a sub-agent loop without a persistent session.
Returns (result_text, completed_normally).
completed_normally is False if the sub-agent hit the iteration limit or timed out.
"""
from navi.tools._internal.base import (
current_session_id as _sid_var,
current_model as _model_var,
current_user_id as _uid_var,
current_user_role as _role_var,
current_user_info as _uinfo_var,
)
_prev_sid = _sid_var.get(None)
_prev_model = _model_var.get(None)
_prev_uid = _uid_var.get(None)
_prev_role = _role_var.get()
_prev_uinfo = _uinfo_var.get(None)
subagent_run_id = f"subagent_{uuid.uuid4().hex[:12]}"
tool_session_id = parent_session_id or subagent_run_id
_sid_var.set(tool_session_id)
profile = self._profiles.get(profile_id)
_model_var.set(profile.model)
exclude = set(exclude_tools or [])
scope = profile.get_subagent_tools()
tools = [
t
for t in build_tool_list(scope.native, scope.mcp, self._tools, self._mcp_manager)
if t.name not in exclude
]
tool_schemas = [t.schema() for t in tools]
llm = self._get_backend(profile.llm_backend)
user_id = None
if parent_session_id and self._sessions:
parent_session = await self._sessions.get(parent_session_id)
if parent_session:
user_id = parent_session.user_id
if user_id is not None:
_uid_var.set(user_id)
_role_var.set(_prev_role or "user")
_uinfo_var.set(_prev_uinfo)
else:
_uid_var.set(None)
_role_var.set("user")
_uinfo_var.set(None)
mem = await self._ctx_builder._memory_msg(user_id=user_id)
sys_parts: list[str] = []
if inherit_system_prompt:
sys_parts.append(profile.system_prompt)
if profile.subagent_system_prompt:
sys_parts.append(profile.subagent_system_prompt)
if custom_system_prompt:
sys_parts.append(custom_system_prompt)
if briefing:
sys_parts.append(f"## Task context\n\n{briefing}")
if parent_session_id:
sys_parts.append(
"[Parent session context]\n"
f"Parent Session ID: {parent_session_id}\n"
f"Session files directory: {settings.session_files_dir}/{parent_session_id}/\n"
"For files the user should see, write to this exact session directory. "
"Do not use or invent a subagent_* directory.\n"
"When calling MCP navi-3d tools (compile_scad, render_stl, lint_scad), pass ONLY the filename "
"(e.g. 'falcon9_rocket.scad') for source_path and output_path. Do NOT include the session_id or "
"the session_files directory in those paths — the MCP server resolves them automatically."
)
if not sys_parts:
sys_parts.append(profile.system_prompt)
subagent_sys_msg = Message(role="system", content="\n\n---\n\n".join(sys_parts))
context: list[Message] = []
if context_transfer:
context.append(
Message(
role="user",
content=f"## Context from parent agent\n\n{context_transfer}",
)
)
context.append(
Message(
role="assistant",
content="Understood. I have the context. Ready to begin the task.",
)
)
context.append(
Message(role="user", content=user_message, created_at=datetime.now(timezone.utc))
)
sink = current_event_sink.get()
log.info(
"agent.subagent.start",
profile_id=profile_id,
max_iterations=max_iterations,
tools=len(tools),
planning=profile.subagent_planning_enabled,
)
stop_event = current_stop_event.get()
tool_map = {t.name: t for t in tools}
subagent_think = (
profile.subagent_think_enabled
if profile.subagent_think_enabled is not None
else profile.think_enabled
)
_turn_tokens: int = 0
_sub_tool_count: int = 0
_start_time = time.monotonic()
accumulated_text = ""
try:
if profile.subagent_planning_enabled:
async for _ev in self._planning.run(
context,
profile,
llm,
mem,
tool_schemas,
system_prompt_override=subagent_sys_msg.content,
is_subagent=True,
):
if isinstance(_ev, AIHelperTokensUsed):
_turn_tokens += _ev.completion_tokens
elif sink is not None:
await sink.put(_ev)
for iteration in range(max_iterations):
if stop_event and stop_event.is_set():
return accumulated_text, False
elapsed = time.monotonic() - _start_time
if elapsed >= timeout_seconds:
log.warning(
"agent.subagent.timeout", elapsed=elapsed, timeout=timeout_seconds
)
if sink is not None:
await sink.put(
SubagentComplete(
token_count=_turn_tokens, tool_call_count=_sub_tool_count
)
)
return accumulated_text or "[Sub-agent timed out]", False
log.debug("agent.subagent.iteration", iteration=iteration)
accumulated_text = ""
accumulated_thinking = ""
turn_tool_calls: list[ToolCallRequest] | None = None
thinking_started_at: float | None = None
thinking_stalled_reason: str | None = None
built_ctx: list[Message] = [subagent_sys_msg]
if mem:
built_ctx.append(mem)
mcp_msg = self._ctx_builder._mcp_context_msg(profile)
if mcp_msg:
built_ctx.append(mcp_msg)
built_ctx.extend(m for m in context if m.role != "system")
self._compressor.check_context_size(built_ctx)
async for chunk in _iter_stream_guarded(
llm.stream_complete(
built_ctx,
tools=tool_schemas if tools else None,
temperature=profile.temperature,
model=profile.model,
think=subagent_think,
top_k=profile.top_k,
top_p=profile.top_p,
num_thread=profile.num_thread,
),
stop_event=stop_event,
first_chunk_timeout=settings.llm_stream_first_chunk_timeout,
chunk_timeout=settings.llm_stream_chunk_timeout,
):
if chunk.completion_tokens is not None:
_turn_tokens += chunk.completion_tokens
if chunk.thinking:
if thinking_started_at is None:
thinking_started_at = time.monotonic()
accumulated_thinking += chunk.thinking
thinking_elapsed = time.monotonic() - thinking_started_at
if (
thinking_elapsed >= _SUBAGENT_THINKING_STALL_SECONDS
or len(accumulated_thinking) >= _SUBAGENT_THINKING_STALL_CHARS
):
thinking_stalled_reason = (
"Sub-agent produced only thinking output for "
f"{thinking_elapsed:.0f}s / {len(accumulated_thinking)} chars "
"without text or tool calls."
)
log.warning(
"agent.subagent.thinking_stall",
elapsed=thinking_elapsed,
chars=len(accumulated_thinking),
profile_id=profile_id,
)
break
if chunk.delta:
accumulated_text += chunk.delta
if chunk.tool_calls:
turn_tool_calls = chunk.tool_calls
if stop_event and stop_event.is_set():
return accumulated_text, False
if thinking_stalled_reason:
if sink is not None:
await sink.put(
SubagentComplete(
token_count=_turn_tokens,
tool_call_count=_sub_tool_count,
)
)
return f"[{thinking_stalled_reason}]", False
if not turn_tool_calls:
log.info(
"agent.subagent.complete",
iterations=iteration + 1,
result_len=len(accumulated_text),
)
if sink is not None:
await sink.put(
SubagentComplete(
token_count=_turn_tokens,
tool_call_count=_sub_tool_count,
)
)
return accumulated_text, True
if accumulated_thinking and sink is not None:
log.debug(
"agent.subagent.turn_thinking", length=len(accumulated_thinking)
)
await sink.put(
TurnThinking(thinking=accumulated_thinking, is_subagent=True)
)
context.append(
Message(
role="assistant",
content=accumulated_text or None,
tool_calls=turn_tool_calls,
)
)
tool_ctx = ToolContext(
session_id=tool_session_id,
event_sink=sink,
stop_event=stop_event,
model=profile.model,
user_id=_uid_var.get(None),
user_role=_role_var.get(),
user_info=_uinfo_var.get(None),
)
for tc in turn_tool_calls:
# Cooperative stop: if the user clicked Stop before this tool,
# skip remaining tools in this batch.
if stop_event and stop_event.is_set():
log.info("agent.subagent.tool_batch_stopped", tool=tc.name)
return "", False
_sub_tool_count += 1
if sink is not None:
await sink.put(
ToolStarted(
tool_name=tc.name,
arguments=tc.arguments,
is_subagent=True,
tool_call_id=tc.id,
)
)
tool = tool_map.get(tc.name)
image_msg = None
metadata: dict = {}
if tool is None:
content = f"Error: tool '{tc.name}' not found."
success = False
else:
log.info(
"tool.execute.subagent", tool=tc.name, args=tc.arguments
)
try:
result = await tool.execute(tc.arguments, ctx=tool_ctx)
content = result.to_message_content()
success = result.success
metadata = result.metadata or {}
if (
result.success
and result.metadata
and result.metadata.get("is_image")
):
b64 = result.metadata.get("base64")
if b64:
image_msg = Message(
role="user",
content=f"[Image loaded via {tc.name} — analyse it]",
images=[b64],
)
except Exception as exc:
log.warning(
"agent.subagent.tool_exception",
tool=tc.name,
error=str(exc),
)
content = f"Error: {exc}"
success = False
metadata = {}
if sink is not None:
await sink.put(
ToolEvent(
tool_name=tc.name,
arguments=tc.arguments,
result=content,
success=success,
is_subagent=True,
tool_call_id=tc.id,
)
)
context.append(
Message(
role="tool",
content=content,
tool_call_id=tc.id,
name=tc.name,
metadata=metadata,
)
)
if image_msg:
context.append(image_msg)
log.warning(
"agent.subagent.max_iterations", max_iterations=max_iterations
)
if sink is not None:
await sink.put(
SubagentComplete(
token_count=_turn_tokens, tool_call_count=_sub_tool_count
)
)
return (
accumulated_text
or "[Sub-agent reached iteration limit without a final answer]",
False,
)
finally:
_sid_var.set(_prev_sid)
_model_var.set(_prev_model)
_uid_var.set(_prev_uid)
_role_var.set(_prev_role)
_uinfo_var.set(_prev_uinfo)
def _get_backend(self, backend_key: str) -> LLMBackend:
return self._backends.get(backend_key)