"""
spawn_agent — delegates a focused sub-task to an isolated agent instance.
The sub-agent runs its own tool-calling loop with a clean context window.
It cannot spawn further sub-agents (recursion is blocked via exclude_tools).
The result is returned as a plain text summary.
"""
import structlog
from navi.exceptions import ProfileNotFound
from ._internal.base import Tool, ToolResult, current_session_id
log = structlog.get_logger()
class SpawnAgentTool(Tool):
name = "spawn_agent"
description = (
"Delegate EXACTLY ONE step of your plan to an isolated sub-agent.\n\n"
"CRITICAL: one spawn_agent call = one plan step. "
"If your plan has three AGENT steps, you make three separate spawn_agent calls — "
"one per step. Never bundle multiple plan steps into a single sub-agent.\n\n"
"SYNCHRONOUS — blocks until the sub-agent fully completes. "
"There is no background process and no continuation.\n\n"
"USER CANNOT SEE sub-agent output — synthesise findings into your own response.\n\n"
"USE when a step requires 3+ tool calls to complete as a single logical unit.\n"
"DO NOT USE for a single tool call — call the tool directly.\n\n"
"PROFILE SELECTION: omit profile_id to use the parent session's current profile. "
"Set profile_id only when the sub-task clearly belongs to another profile. "
"The sub-agent uses that selected profile's subagent_tools, falling back to "
"enabled_tools when subagent_tools is empty.\n\n"
)
parameters = {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": (
"What the sub-agent must accomplish: exact goal, success criteria, "
"expected output format. "
"End with: 'Complete ALL assigned work before responding. Your output is final.'"
),
},
"briefing": {
"type": "string",
"description": (
"Static context injected as system-level instruction: "
"IPs, credentials, file paths, constraints, step-by-step instructions."
),
},
"profile_id": {
"type": "string",
"description": (
"Profile to use for the sub-agent. Defaults to the current session's "
"profile. Override to specialise: e.g. 'server_admin' for remote ops, "
"'secretary' for research, 'developer' for code work, "
"'tool_developer' for Navi tool implementation. The selected profile "
"determines the sub-agent's model, prompt, and available tools."
),
},
"system_prompt": {
"type": "string",
"description": (
"Optional role definition for this sub-agent, injected as a system-level "
"instruction on top of the profile default. Use to specialise the agent: "
"e.g. 'You are a security auditor. Report findings by severity.' "
"or 'You are a metrics collector. Return all values in a structured table.'"
),
},
"max_iterations": {
"type": "integer",
"description": "Maximum tool-call iterations for the sub-agent (default: 40).",
},
},
"required": ["task"],
}
def __init__(
self,
profile_registry,
tool_registry,
backend_registry,
session_store,
memory_store=None,
mcp_manager=None,
) -> None:
self._profile_registry = profile_registry
self._tool_registry = tool_registry
self._backend_registry = backend_registry
self._session_store = session_store
self._memory_store = memory_store
self._mcp_manager = mcp_manager
async def execute(self, params: dict) -> ToolResult:
# Import here to avoid module-level circular import
from navi.core.agent import Agent
from navi.tools.scratchpad import get_section
task = params["task"].strip()
briefing = (params.get("briefing") or "").strip() or None
custom_system_prompt = (params.get("system_prompt") or "").strip() or None
max_iterations = int(params.get("max_iterations") or 40)
# task → user message (what to do)
# briefing → system prompt (context, credentials, instructions)
user_message = task
# Resolve profile: explicit override → parent session's profile → first available
profile_id = params.get("profile_id", "").strip()
if not profile_id:
profile_id = await self._resolve_parent_profile()
try:
selected_profile = self._profile_registry.get(profile_id)
except ProfileNotFound:
available = ", ".join(p.id for p in self._profile_registry.all())
return ToolResult(
success=False,
output=(
f"Unknown sub-agent profile_id: {profile_id!r}. "
f"Available profiles: {available or '(none)'}."
),
error=f"unknown_profile:{profile_id}",
)
# Read parent scratchpad context_transfer section and pass it to the sub-agent.
parent_sid = current_session_id.get()
context_transfer = (await get_section(parent_sid, "context_transfer")) if parent_sid else ""
tool_source = (
selected_profile.subagent_tools
if selected_profile.subagent_tools
else selected_profile.enabled_tools
)
log.info("spawn_agent.start", profile_id=profile_id, max_iterations=max_iterations,
task_preview=task[:80], has_briefing=bool(briefing),
has_context_transfer=bool(context_transfer),
has_system_prompt=bool(custom_system_prompt),
subagent_tools=len(tool_source))
agent = Agent(
session_store=None, # ephemeral — no DB access
profile_registry=self._profile_registry,
tool_registry=self._tool_registry,
backend_registry=self._backend_registry,
workers=[], # no post-response workers for sub-agents
memory_store=self._memory_store,
mcp_manager=self._mcp_manager,
)
try:
result_text, completed = await agent.run_ephemeral(
user_message=user_message,
profile_id=profile_id,
max_iterations=max_iterations,
exclude_tools=["spawn_agent"], # prevent recursion
briefing=briefing,
custom_system_prompt=custom_system_prompt,
context_transfer=context_transfer or None,
parent_session_id=parent_sid,
timeout_seconds=300.0,
)
log.info("spawn_agent.done", profile_id=profile_id, completed=completed,
result_len=len(result_text))
if completed:
output = (
"[Sub-agent completed — USER CANNOT SEE THIS. "
"Synthesise the findings into your own response.]\n\n"
+ result_text
)
else:
output = (
"[Sub-agent hit iteration limit — result may be incomplete. "
"USER CANNOT SEE THIS. "
"Synthesise what was found and note what is missing.]\n\n"
+ result_text
)
return ToolResult(
success=completed,
output=output,
metadata={"completed": completed, "result_len": len(result_text)},
)
except Exception as e:
log.error("spawn_agent.error", error=str(e), exc_info=True)
return ToolResult(success=False, output=f"Sub-agent failed: {e}", error=str(e))
async def _resolve_parent_profile(self) -> str:
"""Return the profile of the current parent session, or fallback to first profile."""
session_id = current_session_id.get()
if session_id and self._session_store:
try:
session = await self._session_store.get(session_id)
if session:
return session.profile_id
except Exception:
pass
# Fallback: first registered profile
all_profiles = self._profile_registry.all()
return all_profiles[0].id if all_profiles else "secretary"