Newer
Older
navi-1 / navi / tools / spawn_agent.py
"""
spawn_agent — delegates a focused sub-task to an isolated agent instance.

The sub-agent runs its own tool-calling loop with a clean context window.
It cannot spawn further sub-agents (recursion is blocked via exclude_tools).
The result is returned as a plain text summary.
"""

import structlog

from .base import Tool, ToolResult, current_session_id

log = structlog.get_logger()


class SpawnAgentTool(Tool):
    name = "spawn_agent"
    description = (
        "Delegate a focused multi-step sub-task to an isolated sub-agent. "
        "SYNCHRONOUS — blocks until the sub-agent fully completes. "
        "The result you receive is the final, complete output. "
        "There is no background process and no continuation.\n\n"
        "USER CANNOT SEE sub-agent output — present all findings in your own response.\n\n"
        "USE when a task requires 2+ tool calls forming one logical unit: "
        "research a topic, audit a module, configure a server, process a file set.\n"
        "DO NOT USE for a single tool call — call the tool directly.\n\n"
        "BRIEFING must be fully self-contained: the sub-agent knows nothing about "
        "your conversation. Include IPs, credentials, file paths, prior results, "
        "expected output format. End with: "
        "'Complete ALL assigned work before responding. Your output is final.'"
    )
    parameters = {
        "type": "object",
        "properties": {
            "task": {
                "type": "string",
                "description": (
                    "Clear, self-contained description of what the sub-agent must accomplish. "
                    "Be specific: include success criteria and expected output format."
                ),
            },
            "briefing": {
                "type": "string",
                "description": (
                    "All context the sub-agent needs — it has zero knowledge of your conversation. "
                    "Include: IPs, credentials, file paths, prior findings, constraints, "
                    "expected output format. End with: "
                    "'Complete ALL assigned work before responding. Your output is final.'"
                ),
            },
            "profile_id": {
                "type": "string",
                "description": (
                    "Profile to use for the sub-agent. Defaults to the current session's "
                    "profile. Override to specialise: e.g. 'server_admin' for remote ops, "
                    "'secretary' for research."
                ),
            },
            "max_iterations": {
                "type": "integer",
                "description": "Maximum tool-call iterations for the sub-agent (default: 20).",
            },
        },
        "required": ["task"],
    }

    def __init__(
        self,
        profile_registry,
        tool_registry,
        backend_registry,
        session_store,
        memory_store=None,
    ) -> None:
        self._profile_registry = profile_registry
        self._tool_registry = tool_registry
        self._backend_registry = backend_registry
        self._session_store = session_store
        self._memory_store = memory_store

    async def execute(self, params: dict) -> ToolResult:
        # Import here to avoid module-level circular import
        from navi.core.agent import Agent

        task = params["task"].strip()
        briefing = params.get("briefing", "").strip()
        max_iterations = int(params.get("max_iterations") or 20)

        # Resolve profile: explicit override → parent session's profile → first available
        profile_id = params.get("profile_id", "").strip()
        if not profile_id:
            profile_id = await self._resolve_parent_profile()

        user_message = task
        if briefing:
            user_message = (
                f"## Context\n\n{briefing}\n\n"
                f"---\n\n"
                f"## Task\n\n{task}"
            )

        log.info("spawn_agent.start", profile_id=profile_id, max_iterations=max_iterations,
                 task_preview=task[:80])

        agent = Agent(
            session_store=None,          # ephemeral — no DB access
            profile_registry=self._profile_registry,
            tool_registry=self._tool_registry,
            backend_registry=self._backend_registry,
            workers=[],                  # no post-response workers for sub-agents
            memory_store=self._memory_store,
        )

        try:
            result = await agent.run_ephemeral(
                user_message=user_message,
                profile_id=profile_id,
                max_iterations=max_iterations,
                exclude_tools=["spawn_agent"],  # prevent recursion
            )
            log.info("spawn_agent.done", profile_id=profile_id, result_len=len(result))
            output = (
                "[Sub-agent result — the USER CANNOT SEE THIS. "
                "You must present the key findings in your own final response.]\n\n"
                + result
            )
            return ToolResult(success=True, output=output)
        except Exception as e:
            log.error("spawn_agent.error", error=str(e), exc_info=True)
            return ToolResult(success=False, output=f"Sub-agent failed: {e}", error=str(e))

    async def _resolve_parent_profile(self) -> str:
        """Return the profile of the current parent session, or fallback to first profile."""
        session_id = current_session_id.get()
        if session_id and self._session_store:
            try:
                session = await self._session_store.get(session_id)
                if session:
                    return session.profile_id
            except Exception:
                pass
        # Fallback: first registered profile
        all_profiles = self._profile_registry.all()
        return all_profiles[0].id if all_profiles else "secretary"