Newer
Older
navi-1 / navi / tools / terminal.py
"""Terminal tool — run shell commands locally, including persistent sessions.

Actions:
  run        — one-shot command (classic terminal behaviour)
  open       — open a named persistent terminal session
  close      — close a named persistent terminal session
  list       — list all persistent terminals for the current Navi session
  status     — detailed status of one persistent terminal
  send_input — write text to a persistent terminal's stdin

If TERMINAL_ALLOWED_COMMANDS=* (default), any command is allowed and the shell
is invoked directly (supports pipes, redirects, subshells, etc.).

Set TERMINAL_ALLOWED_COMMANDS to a comma-separated list to restrict to specific
executables only (e.g. "ls,cat,git").

Multi-user safety:
- Non-admin users are restricted to a sandbox directory (user_data/<user_id>/)
  and a curated allowlist of safe commands.
- Dangerous patterns (curl, wget, ssh, sudo, python -c, node -e, etc.) are
  blocked for non-admins even if the base command is in the allowlist.
- Admins bypass all restrictions and use TERMINAL_ALLOWED_COMMANDS directly.
"""

from __future__ import annotations

import asyncio
import re
import shlex
from pathlib import Path

from navi.config import settings

from ._internal.base import Tool, ToolContext, ToolResult, current_event_sink, current_user_id, current_user_role
from .terminal_manager import TerminalManager

_DEFAULT_TIMEOUT = 20
_MAX_TIMEOUT = 300
_MAX_OUTPUT_CHARS = 5_000

# Substrings that make a command line dangerous for non-admin users.
_DANGEROUS_PATTERNS = [
    r"\bcurl\b",
    r"\bwget\b",
    r"\bnc\b",
    r"\bnetcat\b",
    r"\bssh\b",
    r"\bscp\b",
    r"\bsftp\b",
    r"\bsudo\b",
    r"\bsu\b",
    r"\bpython\b.*-\bc\b",
    r"\bpython3\b.*-\bc\b",
    r"\bnode\b.*-\be\b",
    r"\beval\b",
    r"\bexec\b",
    r";\s*rm\s+-rf\s+/",
    r">\s+/dev/",
    r"<\s*/dev/",
]

_DANGEROUS_RE = re.compile("|".join(_DANGEROUS_PATTERNS), re.IGNORECASE)


def _check_dangerous(command: str) -> str | None:
    """Return block reason if the command contains dangerous patterns, else None."""
    if _DANGEROUS_RE.search(command):
        return "Command contains disallowed patterns for non-admin users."
    return None


def _resolve_working_dir(working_dir: str | None, user_id: str | None = None, role: str | None = None) -> Path | None:
    """Resolve working directory with sandbox enforcement for non-admins."""
    user_id = user_id or current_user_id.get(None)
    role = role or current_user_role.get()

    if user_id and role != "admin":
        sandbox = Path("user_data") / user_id
        sandbox = sandbox.expanduser().resolve()
        sandbox.mkdir(parents=True, exist_ok=True)
        if working_dir:
            p = Path(working_dir).expanduser()
            if p.is_absolute():
                resolved = p.resolve()
                try:
                    resolved.relative_to(sandbox)
                    return resolved
                except ValueError:
                    return None
            return (sandbox / p).resolve()
        return sandbox

    if working_dir:
        return Path(working_dir).expanduser().resolve()
    return None


class TerminalTool(Tool):
    name = "terminal"
    description = (
        "Run shell commands locally. Supports both one-shot execution (run) and "
        "persistent named terminal sessions (open/close/list/status/send_input). "
        "Use persistent terminals for long-running processes, dev servers, or "
        "interactive workflows where state (cwd, env) must be preserved."
    )
    parameters = {
        "type": "object",
        "properties": {
            "action": {
                "type": "string",
                "enum": ["run", "open", "close", "list", "status", "send_input"],
                "description": "Operation to perform.",
            },
            "terminal_name": {
                "type": "string",
                "description": "Name for the persistent terminal (required for open, close, status, send_input).",
            },
            "description": {
                "type": "string",
                "description": "Why this terminal is being opened (required for open).",
            },
            "command": {
                "type": "string",
                "description": "Shell command to execute (required for run and open).",
            },
            "background": {
                "type": "boolean",
                "description": "Run in background without waiting for completion (for open). Default false.",
            },
            "working_dir": {
                "type": "string",
                "description": "Working directory (optional, defaults to home or sandbox).",
            },
            "timeout": {
                "type": "integer",
                "description": f"Timeout in seconds (default {_DEFAULT_TIMEOUT}, max {_MAX_TIMEOUT}).",
            },
            "input": {
                "type": "string",
                "description": "Text to send to the terminal's stdin (for send_input).",
            },
        },
        "required": ["action"],
    }

    def __init__(self, terminal_manager: TerminalManager | None = None) -> None:
        self._tm = terminal_manager

    # ── Execute dispatcher ─────────────────────────────────────────────────

    async def execute(self, params: dict, ctx: ToolContext | None = None) -> ToolResult:
        action = params.get("action", "")
        if not action:
            return ToolResult(success=False, output="Missing 'action' parameter.", error="missing_action")

        role = ctx.user_role if ctx else current_user_role.get()
        user_id = ctx.user_id if ctx else current_user_id.get(None)
        session_id = ctx.session_id if ctx else None

        match action:
            case "run":
                return await self._run(params, role, user_id)
            case "open":
                return await self._open(params, role, user_id, session_id)
            case "close":
                return await self._close(params, session_id)
            case "list":
                return self._list(session_id)
            case "status":
                return self._status(params, session_id)
            case "send_input":
                return await self._send_input(params, session_id)
            case _:
                return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")

    # ── Action handlers ──────────────────────────────────────────────────────

    async def _run(self, params: dict, role: str | None, user_id: str | None) -> ToolResult:
        """One-shot command execution (original terminal behaviour)."""
        command = params.get("command", "").strip()
        if not command:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        working_dir = params.get("working_dir") or None
        raw_timeout = params.get("timeout")
        timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT)) if raw_timeout is not None else _DEFAULT_TIMEOUT

        cwd = _resolve_working_dir(working_dir, user_id, role)
        if cwd is None and working_dir:
            return ToolResult(success=False, output="Working directory is outside your sandbox.", error="sandbox_violation")

        # Admin / legacy unrestricted mode
        if not user_id or role == "admin":
            unrestricted = settings.terminal_allowed_commands.strip() == "*"
            if unrestricted:
                return await self._run_shell(command, cwd, timeout)
            return await self._run_restricted(command, cwd, timeout)

        # Non-admin multi-user mode
        danger = _check_dangerous(command)
        if danger:
            return ToolResult(success=False, output=f"Blocked: {danger}", error="dangerous_command")
        return await self._run_user_restricted(command, cwd, timeout)

    async def _open(self, params: dict, role: str | None, user_id: str | None, session_id: str | None) -> ToolResult:
        if self._tm is None:
            return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
        if not session_id:
            return ToolResult(success=False, output="Persistent terminals require a session context.", error="no_session")

        name = params.get("terminal_name", "").strip()
        description = params.get("description", "").strip()
        command = params.get("command", "").strip()
        background = bool(params.get("background"))
        working_dir = params.get("working_dir") or None
        raw_timeout = params.get("timeout")
        timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT)) if raw_timeout is not None else _DEFAULT_TIMEOUT

        if not name:
            return ToolResult(success=False, output="Missing 'terminal_name' for open.", error="missing_name")
        if not description:
            return ToolResult(success=False, output="Missing 'description' for open.", error="missing_description")
        if not command:
            return ToolResult(success=False, output="Missing 'command' for open.", error="empty_command")

        cwd = _resolve_working_dir(working_dir, user_id, role)
        if cwd is None and working_dir:
            return ToolResult(success=False, output="Working directory is outside your sandbox.", error="sandbox_violation")

        # Security checks (same as run)
        if not user_id or role == "admin":
            unrestricted = settings.terminal_allowed_commands.strip() == "*"
            if not unrestricted:
                tokens = shlex.split(command)
                allowed = settings.terminal_allowed_commands_list
                if tokens and tokens[0] not in allowed:
                    return ToolResult(
                        success=False,
                        output=f"Command '{tokens[0]}' not allowed. Allowed: {allowed}.",
                        error="not_allowed",
                    )
        else:
            danger = _check_dangerous(command)
            if danger:
                return ToolResult(success=False, output=f"Blocked: {danger}", error="dangerous_command")
            tokens = shlex.split(command)
            allowed = settings.terminal_user_allowed_commands_list
            if tokens and tokens[0] not in allowed:
                return ToolResult(
                    success=False,
                    output=f"Command '{tokens[0]}' not allowed for non-admin users. Allowed: {allowed}.",
                    error="not_allowed",
                )

        try:
            session = await self._tm.open(
                session_id=session_id,
                name=name,
                description=description,
                command=command,
                background=background,
                cwd=cwd,
                timeout=timeout,
                event_sink=current_event_sink.get(),
            )
        except ValueError as e:
            return ToolResult(success=False, output=str(e), error="already_exists")
        except Exception as e:
            return ToolResult(success=False, output=f"Failed to open terminal: {e}", error=str(e))

        if background:
            return ToolResult(
                success=True,
                output=f"Terminal '{name}' opened in background. PID: {session.proc.pid if session.proc else 'N/A'}.",
                metadata=session.summary(),
            )

        # Foreground open — gather output
        output_parts = list(session.output_buffer)
        combined = "".join(output_parts)
        if len(combined) > _MAX_OUTPUT_CHARS:
            combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
        rc = session.proc.returncode if session.proc else None
        return ToolResult(
            success=rc == 0,
            output=combined or "(no output)",
            metadata={"returncode": rc, **session.summary()},
            error=None if rc == 0 else f"Exit code {rc}",
        )

    async def _close(self, params: dict, session_id: str | None) -> ToolResult:
        if self._tm is None:
            return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
        if not session_id:
            return ToolResult(success=False, output="No session context.", error="no_session")

        name = params.get("terminal_name", "").strip()
        if not name:
            return ToolResult(success=False, output="Missing 'terminal_name' for close.", error="missing_name")

        ok = await self._tm.close(session_id, name)
        if not ok:
            return ToolResult(success=False, output=f"Terminal '{name}' not found.", error="not_found")
        return ToolResult(success=True, output=f"Terminal '{name}' closed.")

    def _list(self, session_id: str | None) -> ToolResult:
        if self._tm is None:
            return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
        if not session_id:
            return ToolResult(success=False, output="No session context.", error="no_session")

        items = self._tm.list(session_id)
        if not items:
            return ToolResult(success=True, output="No active terminals for this session.")

        lines = [f"Active terminals ({len(items)}):"]
        for item in items:
            status_emoji = "🟢" if item["status"] == "busy" else "⚪"
            lines.append(
                f"  {status_emoji} {item['name']}: {item['description']} "
                f"({item['status']}, PID {item.get('pid') or 'N/A'}, "
                f"uptime {item.get('uptime_seconds', 0)}s)"
            )
        return ToolResult(success=True, output="\n".join(lines), metadata={"terminals": items})

    def _status(self, params: dict, session_id: str | None) -> ToolResult:
        if self._tm is None:
            return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
        if not session_id:
            return ToolResult(success=False, output="No session context.", error="no_session")

        name = params.get("terminal_name", "").strip()
        if not name:
            return ToolResult(success=False, output="Missing 'terminal_name' for status.", error="missing_name")

        st = self._tm.status(session_id, name)
        if st is None:
            return ToolResult(success=False, output=f"Terminal '{name}' not found.", error="not_found")

        lines = [
            f"Terminal: {st['name']}",
            f"Description: {st['description']}",
            f"Command: {st['command']}",
            f"Status: {st['status']}",
            f"PID: {st.get('pid') or 'N/A'}",
            f"CWD: {st.get('cwd') or 'N/A'}",
            f"Uptime: {st.get('uptime_seconds', 0)}s",
            f"Last active: {st['last_active']}",
        ]
        tail = st.get("output_tail", [])
        if tail:
            lines.append("Output tail:")
            for line in tail:
                lines.append(f"  {line.rstrip()}")
        return ToolResult(success=True, output="\n".join(lines), metadata=st)

    async def _send_input(self, params: dict, session_id: str | None) -> ToolResult:
        if self._tm is None:
            return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
        if not session_id:
            return ToolResult(success=False, output="No session context.", error="no_session")

        name = params.get("terminal_name", "").strip()
        text = params.get("input", "")
        if not name:
            return ToolResult(success=False, output="Missing 'terminal_name' for send_input.", error="missing_name")

        ok = await self._tm.send_input(session_id, name, text)
        if not ok:
            return ToolResult(success=False, output=f"Cannot send input to '{name}'. Terminal may be closed or not accepting input.", error="send_failed")
        return ToolResult(success=True, output=f"Sent input to '{name}'.")

    # ── Low-level runners (unchanged from original) ─────────────────────────

    async def _run_shell(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
        """Run via shell — supports pipes, redirects, etc."""
        try:
            proc = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=cwd,
            )
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
            except asyncio.TimeoutError:
                proc.kill()
                return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")

            output_parts = []
            if stdout:
                output_parts.append(stdout.decode(errors="replace"))
            if stderr:
                output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")

            combined = "\n".join(output_parts) or "(no output)"
            if len(combined) > _MAX_OUTPUT_CHARS:
                combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"

            success = proc.returncode == 0
            return ToolResult(
                success=success,
                output=combined,
                metadata={"returncode": proc.returncode},
                error=None if success else f"Exit code {proc.returncode}",
            )
        except Exception as e:
            return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))

    async def _run_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
        """Run with allowlist check — exec directly, no shell features."""
        try:
            tokens = shlex.split(command)
        except ValueError as e:
            return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))

        if not tokens:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        allowed = settings.terminal_allowed_commands_list
        if tokens[0] not in allowed:
            return ToolResult(
                success=False,
                output=f"Command '{tokens[0]}' is not in the allowed list. "
                       f"Allowed: {allowed}. Set TERMINAL_ALLOWED_COMMANDS=* to allow all.",
                error="not_allowed",
            )

        try:
            proc = await asyncio.create_subprocess_exec(
                *tokens,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=cwd,
            )
            try:
                stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
            except asyncio.TimeoutError:
                proc.kill()
                return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")

            output_parts = []
            if stdout:
                output_parts.append(stdout.decode(errors="replace"))
            if stderr:
                output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")

            combined = "\n".join(output_parts) or "(no output)"
            if len(combined) > _MAX_OUTPUT_CHARS:
                combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"

            success = proc.returncode == 0
            return ToolResult(
                success=success,
                output=combined,
                metadata={"returncode": proc.returncode},
                error=None if success else f"Exit code {proc.returncode}",
            )
        except FileNotFoundError:
            return ToolResult(success=False, output=f"Command not found: {tokens[0]}", error="not_found")
        except Exception as e:
            return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))

    async def _run_user_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
        """Run for non-admin users: allowlist + shell features + sandbox cwd."""
        try:
            tokens = shlex.split(command)
        except ValueError as e:
            return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))

        if not tokens:
            return ToolResult(success=False, output="Empty command.", error="empty_command")

        allowed = settings.terminal_user_allowed_commands_list
        if tokens[0] not in allowed:
            return ToolResult(
                success=False,
                output=f"Command '{tokens[0]}' is not in the allowed list for non-admin users. "
                       f"Allowed: {allowed}.",
                error="not_allowed",
            )

        return await self._run_shell(command, str(cwd) if cwd else None, timeout)