Newer
Older
navi-1 / navi / tools / code_exec.py
"""Code execution tool — run Python code in a subprocess sandbox.

Multi-user safety:
- Non-admin users run inside their sandbox directory (user_data/<user_id>/).
- Temp files and working directory are restricted to the sandbox.
- Admins bypass the sandbox and use the system temp directory.
"""

import asyncio
import sys
import tempfile
from pathlib import Path

from ._internal.base import Tool, ToolResult, current_user_id, current_user_role

_TIMEOUT = 30


def _resolve_working_dir(working_dir: str | None) -> Path:
    """Resolve working directory with sandbox enforcement for non-admins."""
    user_id = current_user_id.get(None)
    role = current_user_role.get()

    if user_id and role != "admin":
        sandbox = Path("user_data") / user_id
        sandbox = sandbox.expanduser().resolve()
        sandbox.mkdir(parents=True, exist_ok=True)
        if working_dir:
            p = Path(working_dir).expanduser()
            if p.is_absolute():
                resolved = p.resolve()
                try:
                    resolved.relative_to(sandbox)
                    return resolved
                except ValueError:
                    return sandbox
            return (sandbox / p).resolve()
        return sandbox

    if working_dir:
        return Path(working_dir).expanduser().resolve()
    return Path(tempfile.gettempdir())


class CodeExecTool(Tool):
    name = "code_exec"
    description = (
        "Run Python code and return output. Use for calculations, data parsing, "
        "text processing, or anything that benefits from a script. "
        "Each call is a fresh interpreter — import everything you need, no state persists. "
        "For shell-native tasks (pipes, system commands) prefer terminal instead."
    )
    parameters = {
        "type": "object",
        "properties": {
            "code": {
                "type": "string",
                "description": "Python code to execute",
            },
            "working_dir": {
                "type": "string",
                "description": "Working directory for the script (optional).",
            },
        },
        "required": ["code"],
    }

    async def execute(self, params: dict) -> ToolResult:
        code = params["code"]

        role = current_user_role.get()
        user_id = current_user_id.get(None)

        cwd = _resolve_working_dir(params.get("working_dir"))

        if user_id and role != "admin":
            # Write temp file inside the sandbox so file I/O in user code
            # is implicitly sandboxed unless they escape via absolute paths.
            script_path = cwd / f"_navi_exec_{id(params)}.py"
            script_path.write_text(code, encoding="utf-8")
        else:
            with tempfile.NamedTemporaryFile(suffix=".py", mode="w", delete=False) as f:
                f.write(code)
                script_path = Path(f.name)

        try:
            proc = await asyncio.create_subprocess_exec(
                sys.executable,
                str(script_path),
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(cwd),
            )
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT)
            except asyncio.TimeoutError:
                proc.kill()
                return ToolResult(
                    success=False,
                    output=f"Code execution timed out after {_TIMEOUT}s",
                    error="timeout",
                )

            output_parts = []
            if stdout:
                output_parts.append(stdout.decode(errors="replace"))
            if stderr:
                output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")

            success = proc.returncode == 0
            return ToolResult(
                success=success,
                output="\n".join(output_parts) or "(no output)",
                metadata={"returncode": proc.returncode},
                error=None if success else f"Exit code {proc.returncode}",
            )
        except Exception as e:
            return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))
        finally:
            script_path.unlink(missing_ok=True)