Newer
Older
navi-1 / navi / tools / terminal.py
"""Terminal tool — run shell commands locally.

If TERMINAL_ALLOWED_COMMANDS=* (default), any command is allowed and the shell
is invoked directly (supports pipes, redirects, subshells, etc.).

Set TERMINAL_ALLOWED_COMMANDS to a comma-separated list to restrict to specific
executables only (e.g. "ls,cat,git").

Multi-user safety:
- Non-admin users are restricted to a sandbox directory (user_data/<user_id>/)
  and a curated allowlist of safe commands.
- Dangerous patterns (curl, wget, ssh, sudo, python -c, node -e, etc.) are
  blocked for non-admins even if the base command is in the allowlist.
- Admins bypass all restrictions and use TERMINAL_ALLOWED_COMMANDS directly.
"""

import asyncio
import re
import shlex
from pathlib import Path

from navi.config import settings

from ._internal.base import Tool, ToolContext, ToolResult, current_user_id, current_user_role

_DEFAULT_TIMEOUT = 20
_MAX_TIMEOUT = 300
_MAX_OUTPUT_CHARS = 5_000

# Substrings that make a command line dangerous for non-admin users.
_DANGEROUS_PATTERNS = [
    r"\bcurl\b",
    r"\bwget\b",
    r"\bnc\b",
    r"\bnetcat\b",
    r"\bssh\b",
    r"\bscp\b",
    r"\bsftp\b",
    r"\bsudo\b",
    r"\bsu\b",
    r"\bpython\b.*-\bc\b",
    r"\bpython3\b.*-\bc\b",
    r"\bnode\b.*-\be\b",
    r"\beval\b",
    r"\bexec\b",
    r";\s*rm\s+-rf\s+/",
    r">\s+/dev/",
    r"<\s*/dev/",
]

_DANGEROUS_RE = re.compile("|".join(_DANGEROUS_PATTERNS), re.IGNORECASE)


def _check_dangerous(command: str) -> str | None:
    """Return block reason if the command contains dangerous patterns, else None."""
    if _DANGEROUS_RE.search(command):
        return "Command contains disallowed patterns for non-admin users."
    return None


def _resolve_working_dir(working_dir: str | None, user_id: str | None = None, role: str | None = None) -> Path | None:
    """Resolve working directory with sandbox enforcement for non-admins."""
    user_id = user_id or current_user_id.get(None)
    role = role or current_user_role.get()

    if user_id and role != "admin":
        sandbox = Path("user_data") / user_id
        sandbox = sandbox.expanduser().resolve()
        sandbox.mkdir(parents=True, exist_ok=True)
        if working_dir:
            p = Path(working_dir).expanduser()
            if p.is_absolute():
                resolved = p.resolve()
                try:
                    resolved.relative_to(sandbox)
                    return resolved
                except ValueError:
                    return None
            return (sandbox / p).resolve()
        return sandbox

    if working_dir:
        return Path(working_dir).expanduser().resolve()
    return None


class TerminalTool(Tool):
    name = "terminal"
    description = (
        "Run a shell command on the local machine. Full bash shell — "
        "pipes, redirects, env vars, multi-command chains all work. "
        "Use for system operations, package management, running processes, "
        "or any task more natural in shell than Python."
    )
    parameters = {
        "type": "object",
        "properties": {
            "command": {
                "type": "string",
                "description": "Shell command to execute",
            },
            "working_dir": {
                "type": "string",
                "description": "Working directory (optional, defaults to home directory)",
            },
            "timeout": {
                "type": "integer",
                "description": f"Timeout in seconds (default {_DEFAULT_TIMEOUT}, max {_MAX_TIMEOUT}). Set higher when installing packages or running long tasks.",
            },
        },
        "required": ["command"],
    }

    async def execute(self, params: dict, ctx: ToolContext | None = None) -> ToolResult:
        command = params["command"].strip()
        working_dir = params.get("working_dir") or None
        raw_timeout = params.get("timeout")
        if raw_timeout is not None:
            timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT))
        else:
            timeout = _DEFAULT_TIMEOUT

        if not command:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        role = ctx.user_role if ctx else current_user_role.get()
        user_id = ctx.user_id if ctx else current_user_id.get(None)

        # Admins and single-user / legacy mode: use the existing restriction logic
        if not user_id or role == "admin":
            unrestricted = settings.terminal_allowed_commands.strip() == "*"
            if unrestricted:
                return await self._run_shell(command, working_dir, timeout)
            else:
                return await self._run_restricted(command, working_dir, timeout)

        # Non-admin multi-user mode: sandbox + curated allowlist + dangerous-pattern block
        cwd = _resolve_working_dir(working_dir, user_id, role)
        if cwd is None and working_dir:
            return ToolResult(
                success=False,
                output="Working directory is outside your sandbox.",
                error="sandbox_violation",
            )

        danger = _check_dangerous(command)
        if danger:
            return ToolResult(
                success=False,
                output=f"Blocked: {danger}",
                error="dangerous_command",
            )

        return await self._run_user_restricted(command, cwd, timeout)

    async def _run_shell(self, command: str, cwd: str | None, timeout: int) -> ToolResult:
        """Run via shell — supports pipes, redirects, etc."""
        try:
            proc = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=cwd,
            )
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
            except asyncio.TimeoutError:
                proc.kill()
                return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")

            output_parts = []
            if stdout:
                output_parts.append(stdout.decode(errors="replace"))
            if stderr:
                output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")

            combined = "\n".join(output_parts) or "(no output)"
            if len(combined) > _MAX_OUTPUT_CHARS:
                combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"

            success = proc.returncode == 0
            return ToolResult(
                success=success,
                output=combined,
                metadata={"returncode": proc.returncode},
                error=None if success else f"Exit code {proc.returncode}",
            )
        except Exception as e:
            return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))

    async def _run_restricted(self, command: str, cwd: str | None, timeout: int) -> ToolResult:
        """Run with allowlist check — exec directly, no shell features."""
        try:
            tokens = shlex.split(command)
        except ValueError as e:
            return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))

        if not tokens:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        allowed = settings.terminal_allowed_commands_list
        if tokens[0] not in allowed:
            return ToolResult(
                success=False,
                output=f"Command '{tokens[0]}' is not in the allowed list. "
                       f"Allowed: {allowed}. Set TERMINAL_ALLOWED_COMMANDS=* to allow all.",
                error="not_allowed",
            )

        try:
            proc = await asyncio.create_subprocess_exec(
                *tokens,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=cwd,
            )
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
            except asyncio.TimeoutError:
                proc.kill()
                return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")

            output_parts = []
            if stdout:
                output_parts.append(stdout.decode(errors="replace"))
            if stderr:
                output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")

            combined = "\n".join(output_parts) or "(no output)"
            if len(combined) > _MAX_OUTPUT_CHARS:
                combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"

            success = proc.returncode == 0
            return ToolResult(
                success=success,
                output=combined,
                metadata={"returncode": proc.returncode},
                error=None if success else f"Exit code {proc.returncode}",
            )
        except FileNotFoundError:
            return ToolResult(success=False, output=f"Command not found: {tokens[0]}", error="not_found")
        except Exception as e:
            return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))

    async def _run_user_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
        """Run for non-admin users: allowlist + shell features + sandbox cwd."""
        try:
            tokens = shlex.split(command)
        except ValueError as e:
            return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))

        if not tokens:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        allowed = settings.terminal_user_allowed_commands_list
        if tokens[0] not in allowed:
            return ToolResult(
                success=False,
                output=f"Command '{tokens[0]}' is not in the allowed list for non-admin users. "
                       f"Allowed: {allowed}.",
                error="not_allowed",
            )

        # Shell invocation so pipes/redirects work, but we already validated the base command
        return await self._run_shell(command, str(cwd) if cwd else None, timeout)