"""Terminal tool — run shell commands locally, including persistent sessions.
Actions:
run — one-shot command (classic terminal behaviour)
open — open a named persistent terminal session
close — close a named persistent terminal session
list — list all persistent terminals for the current Navi session
status — detailed status of one persistent terminal
send_input — write text to a persistent terminal's stdin
If TERMINAL_ALLOWED_COMMANDS=* (default), any command is allowed and the shell
is invoked directly (supports pipes, redirects, subshells, etc.).
Set TERMINAL_ALLOWED_COMMANDS to a comma-separated list to restrict to specific
executables only (e.g. "ls,cat,git").
Multi-user safety:
- Non-admin users are restricted to a sandbox directory (user_data/<user_id>/)
and a curated allowlist of safe commands.
- Dangerous patterns (curl, wget, ssh, sudo, python -c, node -e, etc.) are
blocked for non-admins even if the base command is in the allowlist.
- Admins bypass all restrictions and use TERMINAL_ALLOWED_COMMANDS directly.
"""
from __future__ import annotations
import asyncio
import re
import shlex
from pathlib import Path
from navi.config import settings
from ._internal.base import Tool, ToolContext, ToolResult, current_event_sink, current_user_id, current_user_role
from .terminal_manager import TerminalManager
_DEFAULT_TIMEOUT = 20
_MAX_TIMEOUT = 300
_MAX_OUTPUT_CHARS = 5_000
# Substrings that make a command line dangerous for non-admin users.
_DANGEROUS_PATTERNS = [
r"\bcurl\b",
r"\bwget\b",
r"\bnc\b",
r"\bnetcat\b",
r"\bssh\b",
r"\bscp\b",
r"\bsftp\b",
r"\bsudo\b",
r"\bsu\b",
r"\bpython\b.*-\bc\b",
r"\bpython3\b.*-\bc\b",
r"\bnode\b.*-\be\b",
r"\beval\b",
r"\bexec\b",
r";\s*rm\s+-rf\s+/",
r">\s+/dev/",
r"<\s*/dev/",
]
_DANGEROUS_RE = re.compile("|".join(_DANGEROUS_PATTERNS), re.IGNORECASE)
def _check_dangerous(command: str) -> str | None:
"""Return block reason if the command contains dangerous patterns, else None."""
if _DANGEROUS_RE.search(command):
return "Command contains disallowed patterns for non-admin users."
return None
def _resolve_working_dir(working_dir: str | None, user_id: str | None = None, role: str | None = None) -> Path | None:
"""Resolve working directory with sandbox enforcement for non-admins."""
user_id = user_id or current_user_id.get(None)
role = role or current_user_role.get()
if user_id and role != "admin":
sandbox = Path("user_data") / user_id
sandbox = sandbox.expanduser().resolve()
sandbox.mkdir(parents=True, exist_ok=True)
if working_dir:
p = Path(working_dir).expanduser()
if p.is_absolute():
resolved = p.resolve()
try:
resolved.relative_to(sandbox)
return resolved
except ValueError:
return None
return (sandbox / p).resolve()
return sandbox
if working_dir:
return Path(working_dir).expanduser().resolve()
return None
class TerminalTool(Tool):
name = "terminal"
description = (
"Run shell commands locally. Supports both one-shot execution (run) and "
"persistent named terminal sessions (open/close/list/status/send_input). "
"Use persistent terminals for long-running processes, dev servers, or "
"interactive workflows where state (cwd, env) must be preserved."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["run", "open", "close", "list", "status", "send_input"],
"description": "Operation to perform.",
},
"terminal_name": {
"type": "string",
"description": "Name for the persistent terminal (required for open, close, status, send_input).",
},
"description": {
"type": "string",
"description": "Why this terminal is being opened (required for open).",
},
"command": {
"type": "string",
"description": "Shell command to execute (required for run and open).",
},
"background": {
"type": "boolean",
"description": "Run in background without waiting for completion (for open). Default false.",
},
"working_dir": {
"type": "string",
"description": "Working directory (optional, defaults to home or sandbox).",
},
"timeout": {
"type": "integer",
"description": f"Timeout in seconds (default {_DEFAULT_TIMEOUT}, max {_MAX_TIMEOUT}).",
},
"input": {
"type": "string",
"description": "Text to send to the terminal's stdin (for send_input).",
},
},
"required": ["action"],
}
def __init__(self, terminal_manager: TerminalManager | None = None) -> None:
self._tm = terminal_manager
# ── Execute dispatcher ─────────────────────────────────────────────────
async def execute(self, params: dict, ctx: ToolContext | None = None) -> ToolResult:
action = params.get("action", "")
if not action:
return ToolResult(success=False, output="Missing 'action' parameter.", error="missing_action")
role = ctx.user_role if ctx else current_user_role.get()
user_id = ctx.user_id if ctx else current_user_id.get(None)
session_id = ctx.session_id if ctx else None
match action:
case "run":
return await self._run(params, role, user_id)
case "open":
return await self._open(params, role, user_id, session_id)
case "close":
return await self._close(params, session_id)
case "list":
return self._list(session_id)
case "status":
return self._status(params, session_id)
case "send_input":
return await self._send_input(params, session_id)
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
# ── Action handlers ──────────────────────────────────────────────────────
async def _run(self, params: dict, role: str | None, user_id: str | None) -> ToolResult:
"""One-shot command execution (original terminal behaviour)."""
command = params.get("command", "").strip()
if not command:
return ToolResult(success=False, output="Empty command.", error="empty_command")
working_dir = params.get("working_dir") or None
raw_timeout = params.get("timeout")
timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT)) if raw_timeout is not None else _DEFAULT_TIMEOUT
cwd = _resolve_working_dir(working_dir, user_id, role)
if cwd is None and working_dir:
return ToolResult(success=False, output="Working directory is outside your sandbox.", error="sandbox_violation")
# Admin / legacy unrestricted mode
if not user_id or role == "admin":
unrestricted = settings.terminal_allowed_commands.strip() == "*"
if unrestricted:
return await self._run_shell(command, cwd, timeout)
return await self._run_restricted(command, cwd, timeout)
# Non-admin multi-user mode
danger = _check_dangerous(command)
if danger:
return ToolResult(success=False, output=f"Blocked: {danger}", error="dangerous_command")
return await self._run_user_restricted(command, cwd, timeout)
async def _open(self, params: dict, role: str | None, user_id: str | None, session_id: str | None) -> ToolResult:
if self._tm is None:
return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
if not session_id:
return ToolResult(success=False, output="Persistent terminals require a session context.", error="no_session")
name = params.get("terminal_name", "").strip()
description = params.get("description", "").strip()
command = params.get("command", "").strip()
background = bool(params.get("background"))
working_dir = params.get("working_dir") or None
raw_timeout = params.get("timeout")
timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT)) if raw_timeout is not None else _DEFAULT_TIMEOUT
if not name:
return ToolResult(success=False, output="Missing 'terminal_name' for open.", error="missing_name")
if not description:
return ToolResult(success=False, output="Missing 'description' for open.", error="missing_description")
if not command:
return ToolResult(success=False, output="Missing 'command' for open.", error="empty_command")
cwd = _resolve_working_dir(working_dir, user_id, role)
if cwd is None and working_dir:
return ToolResult(success=False, output="Working directory is outside your sandbox.", error="sandbox_violation")
# Security checks (same as run)
if not user_id or role == "admin":
unrestricted = settings.terminal_allowed_commands.strip() == "*"
if not unrestricted:
tokens = shlex.split(command)
allowed = settings.terminal_allowed_commands_list
if tokens and tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' not allowed. Allowed: {allowed}.",
error="not_allowed",
)
else:
danger = _check_dangerous(command)
if danger:
return ToolResult(success=False, output=f"Blocked: {danger}", error="dangerous_command")
tokens = shlex.split(command)
allowed = settings.terminal_user_allowed_commands_list
if tokens and tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' not allowed for non-admin users. Allowed: {allowed}.",
error="not_allowed",
)
# Determine whether to use shell or exec based on restrictions
exec_tokens: list[str] | None = None
if not user_id or role == "admin":
unrestricted = settings.terminal_allowed_commands.strip() == "*"
if not unrestricted:
try:
exec_tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
else:
# Non-admin: always use exec to enforce restrictions
try:
exec_tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
try:
session = await self._tm.open(
session_id=session_id,
name=name,
description=description,
command=command,
background=background,
cwd=cwd,
timeout=timeout,
event_sink=current_event_sink.get(),
exec_tokens=exec_tokens,
)
except ValueError as e:
return ToolResult(success=False, output=str(e), error="already_exists")
except Exception as e:
return ToolResult(success=False, output=f"Failed to open terminal: {e}", error=str(e))
if background:
return ToolResult(
success=True,
output=f"Terminal '{name}' opened in background. PID: {session.proc.pid if session.proc else 'N/A'}.",
metadata=session.summary(),
)
# Foreground open — gather output and close immediately
output_parts = list(session.output_buffer)
combined = "".join(output_parts)
if len(combined) > _MAX_OUTPUT_CHARS:
combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
rc = session.proc.returncode if session.proc else None
# Close foreground terminal immediately so it doesn't clutter list
await self._tm.close(session_id, name)
return ToolResult(
success=rc == 0,
output=combined or "(no output)",
metadata={"returncode": rc, **session.summary()},
error=None if rc == 0 else f"Exit code {rc}",
)
async def _close(self, params: dict, session_id: str | None) -> ToolResult:
if self._tm is None:
return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
if not session_id:
return ToolResult(success=False, output="No session context.", error="no_session")
name = params.get("terminal_name", "").strip()
if not name:
return ToolResult(success=False, output="Missing 'terminal_name' for close.", error="missing_name")
ok = await self._tm.close(session_id, name)
if not ok:
return ToolResult(success=False, output=f"Terminal '{name}' not found.", error="not_found")
return ToolResult(success=True, output=f"Terminal '{name}' closed.")
def _list(self, session_id: str | None) -> ToolResult:
if self._tm is None:
return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
if not session_id:
return ToolResult(success=False, output="No session context.", error="no_session")
items = self._tm.list(session_id)
if not items:
return ToolResult(success=True, output="No active terminals for this session.")
lines = [f"Active terminals ({len(items)}):"]
for item in items:
status_emoji = "🟢" if item["status"] == "busy" else "⚪"
lines.append(
f" {status_emoji} {item['name']}: {item['description']} "
f"({item['status']}, PID {item.get('pid') or 'N/A'}, "
f"uptime {item.get('uptime_seconds', 0)}s)"
)
return ToolResult(success=True, output="\n".join(lines), metadata={"terminals": items})
def _status(self, params: dict, session_id: str | None) -> ToolResult:
if self._tm is None:
return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
if not session_id:
return ToolResult(success=False, output="No session context.", error="no_session")
name = params.get("terminal_name", "").strip()
if not name:
return ToolResult(success=False, output="Missing 'terminal_name' for status.", error="missing_name")
st = self._tm.status(session_id, name)
if st is None:
return ToolResult(success=False, output=f"Terminal '{name}' not found.", error="not_found")
lines = [
f"Terminal: {st['name']}",
f"Description: {st['description']}",
f"Command: {st['command']}",
f"Status: {st['status']}",
f"PID: {st.get('pid') or 'N/A'}",
f"CWD: {st.get('cwd') or 'N/A'}",
f"Uptime: {st.get('uptime_seconds', 0)}s",
f"Last active: {st['last_active']}",
]
tail = st.get("output_tail", [])
if tail:
lines.append("Output tail:")
for line in tail:
lines.append(f" {line.rstrip()}")
return ToolResult(success=True, output="\n".join(lines), metadata=st)
async def _send_input(self, params: dict, session_id: str | None) -> ToolResult:
if self._tm is None:
return ToolResult(success=False, output="Terminal manager is not available.", error="no_manager")
if not session_id:
return ToolResult(success=False, output="No session context.", error="no_session")
name = params.get("terminal_name", "").strip()
text = params.get("input", "")
if not name:
return ToolResult(success=False, output="Missing 'terminal_name' for send_input.", error="missing_name")
ok = await self._tm.send_input(session_id, name, text)
if not ok:
return ToolResult(success=False, output=f"Cannot send input to '{name}'. Terminal may be closed or not accepting input.", error="send_failed")
return ToolResult(success=True, output=f"Sent input to '{name}'.")
# ── Low-level runners (unchanged from original) ─────────────────────────
async def _run_shell(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
"""Run via shell — supports pipes, redirects, etc."""
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")
output_parts = []
if stdout:
output_parts.append(stdout.decode(errors="replace"))
if stderr:
output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")
combined = "\n".join(output_parts) or "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
success = proc.returncode == 0
return ToolResult(
success=success,
output=combined,
metadata={"returncode": proc.returncode},
error=None if success else f"Exit code {proc.returncode}",
)
except Exception as e:
return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))
async def _run_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
"""Run with allowlist check — exec directly, no shell features."""
try:
tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
if not tokens:
return ToolResult(success=False, output="Empty command.", error="empty_command")
allowed = settings.terminal_allowed_commands_list
if tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' is not in the allowed list. "
f"Allowed: {allowed}. Set TERMINAL_ALLOWED_COMMANDS=* to allow all.",
error="not_allowed",
)
try:
proc = await asyncio.create_subprocess_exec(
*tokens,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")
output_parts = []
if stdout:
output_parts.append(stdout.decode(errors="replace"))
if stderr:
output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")
combined = "\n".join(output_parts) or "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
success = proc.returncode == 0
return ToolResult(
success=success,
output=combined,
metadata={"returncode": proc.returncode},
error=None if success else f"Exit code {proc.returncode}",
)
except FileNotFoundError:
return ToolResult(success=False, output=f"Command not found: {tokens[0]}", error="not_found")
except Exception as e:
return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))
async def _run_user_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
"""Run for non-admin users: allowlist + shell features + sandbox cwd."""
try:
tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
if not tokens:
return ToolResult(success=False, output="Empty command.", error="empty_command")
allowed = settings.terminal_user_allowed_commands_list
if tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' is not in the allowed list for non-admin users. "
f"Allowed: {allowed}.",
error="not_allowed",
)
return await self._run_shell(command, str(cwd) if cwd else None, timeout)