"""
spawn_agent — delegates a focused sub-task to an isolated agent instance.
The sub-agent runs its own tool-calling loop with a clean context window.
It cannot spawn further sub-agents (recursion is blocked via exclude_tools).
The result is returned as a plain text summary.
"""
import structlog
from .base import Tool, ToolResult, current_session_id
log = structlog.get_logger()
class SpawnAgentTool(Tool):
name = "spawn_agent"
description = (
"Delegate a focused multi-step sub-task to an isolated sub-agent. "
"SYNCHRONOUS — blocks until the sub-agent fully completes. "
"The result you receive is the final, complete output. "
"There is no background process and no continuation.\n\n"
"USER CANNOT SEE sub-agent output — present all findings in your own response.\n\n"
"USE when a task requires 2+ tool calls forming one logical unit: "
"research a topic, audit a module, configure a server, process a file set.\n"
"DO NOT USE for a single tool call — call the tool directly.\n\n"
"The task field must be fully self-contained: the sub-agent knows nothing about "
"your conversation. Include IPs, credentials, file paths, prior results, "
"expected output format. End with: "
"'Complete ALL assigned work before responding. Your output is final.'\n\n"
"Context transfer: before spawning, write key context to your scratchpad "
"section 'context_transfer' — it is automatically injected into the sub-agent."
)
parameters = {
"type": "object",
"properties": {
"task": {
"type": "string",
"description": (
"Self-contained description of what the sub-agent must accomplish. "
"Include: exact goal, success criteria, expected output format, "
"all credentials / file paths / prior findings needed. "
"End with: 'Complete ALL assigned work before responding. Your output is final.'"
),
},
"profile_id": {
"type": "string",
"description": (
"Profile to use for the sub-agent. Defaults to the current session's "
"profile. Override to specialise: e.g. 'server_admin' for remote ops, "
"'secretary' for research, 'developer' for tool writing."
),
},
"system_prompt": {
"type": "string",
"description": (
"Optional custom system prompt injected into the sub-agent in addition "
"to the profile's default. Use to set a specific role, constraints, or "
"output format requirements for this particular task. "
"If omitted, the profile's built-in subagent prompt is used."
),
},
"max_iterations": {
"type": "integer",
"description": "Maximum tool-call iterations for the sub-agent (default: 20).",
},
},
"required": ["task"],
}
def __init__(
self,
profile_registry,
tool_registry,
backend_registry,
session_store,
memory_store=None,
) -> None:
self._profile_registry = profile_registry
self._tool_registry = tool_registry
self._backend_registry = backend_registry
self._session_store = session_store
self._memory_store = memory_store
async def execute(self, params: dict) -> ToolResult:
# Import here to avoid module-level circular import
from navi.core.agent import Agent
from navi.tools.scratchpad import get_section
task = params["task"].strip()
custom_system_prompt = (params.get("system_prompt") or "").strip() or None
max_iterations = int(params.get("max_iterations") or 20)
# Resolve profile: explicit override → parent session's profile → first available
profile_id = params.get("profile_id", "").strip()
if not profile_id:
profile_id = await self._resolve_parent_profile()
# Read parent scratchpad context_transfer section and pass it to the sub-agent.
parent_sid = current_session_id.get()
context_transfer = get_section(parent_sid, "context_transfer") if parent_sid else ""
log.info("spawn_agent.start", profile_id=profile_id, max_iterations=max_iterations,
task_preview=task[:80], has_context_transfer=bool(context_transfer),
has_custom_prompt=bool(custom_system_prompt))
agent = Agent(
session_store=None, # ephemeral — no DB access
profile_registry=self._profile_registry,
tool_registry=self._tool_registry,
backend_registry=self._backend_registry,
workers=[], # no post-response workers for sub-agents
memory_store=self._memory_store,
)
try:
result_text, completed = await agent.run_ephemeral(
user_message=task,
profile_id=profile_id,
max_iterations=max_iterations,
exclude_tools=["spawn_agent"], # prevent recursion
custom_system_prompt=custom_system_prompt,
context_transfer=context_transfer or None,
timeout_seconds=300.0,
)
status = "completed" if completed else "limit_reached"
log.info("spawn_agent.done", profile_id=profile_id, status=status,
result_len=len(result_text))
output = (
f"[STATUS: {status}]\n"
"[Sub-agent result — the USER CANNOT SEE THIS. "
"You must present the key findings in your own final response.]\n\n"
+ result_text
)
return ToolResult(success=completed, output=output,
metadata={"status": status, "result_len": len(result_text)})
except Exception as e:
log.error("spawn_agent.error", error=str(e), exc_info=True)
return ToolResult(success=False, output=f"Sub-agent failed: {e}", error=str(e))
async def _resolve_parent_profile(self) -> str:
"""Return the profile of the current parent session, or fallback to first profile."""
session_id = current_session_id.get()
if session_id and self._session_store:
try:
session = await self._session_store.get(session_id)
if session:
return session.profile_id
except Exception:
pass
# Fallback: first registered profile
all_profiles = self._profile_registry.all()
return all_profiles[0].id if all_profiles else "secretary"