Newer
Older
navi-1 / navi / tools / spawn_agent.py
"""
spawn_agent — delegates a focused sub-task to an isolated agent instance.

The sub-agent runs its own tool-calling loop with a clean context window.
It cannot spawn further sub-agents (recursion is blocked via exclude_tools).
The result is returned as a plain text summary.
"""

import structlog

from .base import Tool, ToolResult, current_session_id

log = structlog.get_logger()


class SpawnAgentTool(Tool):
    name = "spawn_agent"
    description = (
        "Delegate a focused multi-step sub-task to an isolated sub-agent. "
        "SYNCHRONOUS — blocks until the sub-agent fully completes. "
        "The result you receive is the final, complete output. "
        "There is no background process and no continuation.\n\n"
        "USER CANNOT SEE sub-agent output — present all findings in your own response.\n\n"
        "USE when a task requires 2+ tool calls forming one logical unit: "
        "research a topic, audit a module, configure a server, process a file set.\n"
        "DO NOT USE for a single tool call — call the tool directly.\n\n"
        "The task field must be fully self-contained: the sub-agent knows nothing about "
        "your conversation. Include IPs, credentials, file paths, prior results, "
        "expected output format. End with: "
        "'Complete ALL assigned work before responding. Your output is final.'\n\n"
        "Context transfer: before spawning, write key context to your scratchpad "
        "section 'context_transfer' — it is automatically injected into the sub-agent."
    )
    parameters = {
        "type": "object",
        "properties": {
            "task": {
                "type": "string",
                "description": (
                    "Self-contained description of what the sub-agent must accomplish. "
                    "Include: exact goal, success criteria, expected output format, "
                    "all credentials / file paths / prior findings needed. "
                    "End with: 'Complete ALL assigned work before responding. Your output is final.'"
                ),
            },
            "profile_id": {
                "type": "string",
                "description": (
                    "Profile to use for the sub-agent. Defaults to the current session's "
                    "profile. Override to specialise: e.g. 'server_admin' for remote ops, "
                    "'secretary' for research, 'developer' for tool writing."
                ),
            },
            "system_prompt": {
                "type": "string",
                "description": (
                    "Optional custom system prompt injected into the sub-agent in addition "
                    "to the profile's default. Use to set a specific role, constraints, or "
                    "output format requirements for this particular task. "
                    "If omitted, the profile's built-in subagent prompt is used."
                ),
            },
            "max_iterations": {
                "type": "integer",
                "description": "Maximum tool-call iterations for the sub-agent (default: 20).",
            },
        },
        "required": ["task"],
    }

    def __init__(
        self,
        profile_registry,
        tool_registry,
        backend_registry,
        session_store,
        memory_store=None,
    ) -> None:
        self._profile_registry = profile_registry
        self._tool_registry = tool_registry
        self._backend_registry = backend_registry
        self._session_store = session_store
        self._memory_store = memory_store

    async def execute(self, params: dict) -> ToolResult:
        # Import here to avoid module-level circular import
        from navi.core.agent import Agent
        from navi.tools.scratchpad import get_section

        task = params["task"].strip()
        custom_system_prompt = (params.get("system_prompt") or "").strip() or None
        max_iterations = int(params.get("max_iterations") or 20)

        # Resolve profile: explicit override → parent session's profile → first available
        profile_id = params.get("profile_id", "").strip()
        if not profile_id:
            profile_id = await self._resolve_parent_profile()

        # Read parent scratchpad context_transfer section and pass it to the sub-agent.
        parent_sid = current_session_id.get()
        context_transfer = get_section(parent_sid, "context_transfer") if parent_sid else ""

        log.info("spawn_agent.start", profile_id=profile_id, max_iterations=max_iterations,
                 task_preview=task[:80], has_context_transfer=bool(context_transfer),
                 has_custom_prompt=bool(custom_system_prompt))

        agent = Agent(
            session_store=None,          # ephemeral — no DB access
            profile_registry=self._profile_registry,
            tool_registry=self._tool_registry,
            backend_registry=self._backend_registry,
            workers=[],                  # no post-response workers for sub-agents
            memory_store=self._memory_store,
        )

        try:
            result_text, completed = await agent.run_ephemeral(
                user_message=task,
                profile_id=profile_id,
                max_iterations=max_iterations,
                exclude_tools=["spawn_agent"],  # prevent recursion
                custom_system_prompt=custom_system_prompt,
                context_transfer=context_transfer or None,
                timeout_seconds=300.0,
            )
            status = "completed" if completed else "limit_reached"
            log.info("spawn_agent.done", profile_id=profile_id, status=status,
                     result_len=len(result_text))
            output = (
                f"[STATUS: {status}]\n"
                "[Sub-agent result — the USER CANNOT SEE THIS. "
                "You must present the key findings in your own final response.]\n\n"
                + result_text
            )
            return ToolResult(success=completed, output=output,
                              metadata={"status": status, "result_len": len(result_text)})
        except Exception as e:
            log.error("spawn_agent.error", error=str(e), exc_info=True)
            return ToolResult(success=False, output=f"Sub-agent failed: {e}", error=str(e))

    async def _resolve_parent_profile(self) -> str:
        """Return the profile of the current parent session, or fallback to first profile."""
        session_id = current_session_id.get()
        if session_id and self._session_store:
            try:
                session = await self._session_store.get(session_id)
                if session:
                    return session.profile_id
            except Exception:
                pass
        # Fallback: first registered profile
        all_profiles = self._profile_registry.all()
        return all_profiles[0].id if all_profiles else "secretary"