"""Terminal tool — run shell commands locally.
If TERMINAL_ALLOWED_COMMANDS=* (default), any command is allowed and the shell
is invoked directly (supports pipes, redirects, subshells, etc.).
Set TERMINAL_ALLOWED_COMMANDS to a comma-separated list to restrict to specific
executables only (e.g. "ls,cat,git").
Multi-user safety:
- Non-admin users are restricted to a sandbox directory (user_data/<user_id>/)
and a curated allowlist of safe commands.
- Dangerous patterns (curl, wget, ssh, sudo, python -c, node -e, etc.) are
blocked for non-admins even if the base command is in the allowlist.
- Admins bypass all restrictions and use TERMINAL_ALLOWED_COMMANDS directly.
"""
import asyncio
import re
import shlex
from pathlib import Path
from navi.config import settings
from ._internal.base import Tool, ToolContext, ToolResult, current_user_id, current_user_role
_DEFAULT_TIMEOUT = 20
_MAX_TIMEOUT = 300
_MAX_OUTPUT_CHARS = 5_000
# Substrings that make a command line dangerous for non-admin users.
_DANGEROUS_PATTERNS = [
r"\bcurl\b",
r"\bwget\b",
r"\bnc\b",
r"\bnetcat\b",
r"\bssh\b",
r"\bscp\b",
r"\bsftp\b",
r"\bsudo\b",
r"\bsu\b",
r"\bpython\b.*-\bc\b",
r"\bpython3\b.*-\bc\b",
r"\bnode\b.*-\be\b",
r"\beval\b",
r"\bexec\b",
r";\s*rm\s+-rf\s+/",
r">\s+/dev/",
r"<\s*/dev/",
]
_DANGEROUS_RE = re.compile("|".join(_DANGEROUS_PATTERNS), re.IGNORECASE)
def _check_dangerous(command: str) -> str | None:
"""Return block reason if the command contains dangerous patterns, else None."""
if _DANGEROUS_RE.search(command):
return "Command contains disallowed patterns for non-admin users."
return None
def _resolve_working_dir(working_dir: str | None, user_id: str | None = None, role: str | None = None) -> Path | None:
"""Resolve working directory with sandbox enforcement for non-admins."""
user_id = user_id or current_user_id.get(None)
role = role or current_user_role.get()
if user_id and role != "admin":
sandbox = Path("user_data") / user_id
sandbox = sandbox.expanduser().resolve()
sandbox.mkdir(parents=True, exist_ok=True)
if working_dir:
p = Path(working_dir).expanduser()
if p.is_absolute():
resolved = p.resolve()
try:
resolved.relative_to(sandbox)
return resolved
except ValueError:
return None
return (sandbox / p).resolve()
return sandbox
if working_dir:
return Path(working_dir).expanduser().resolve()
return None
class TerminalTool(Tool):
name = "terminal"
description = (
"Run a shell command on the local machine. Full bash shell — "
"pipes, redirects, env vars, multi-command chains all work. "
"Use for system operations, package management, running processes, "
"or any task more natural in shell than Python."
)
parameters = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to execute",
},
"working_dir": {
"type": "string",
"description": "Working directory (optional, defaults to home directory)",
},
"timeout": {
"type": "integer",
"description": f"Timeout in seconds (default {_DEFAULT_TIMEOUT}, max {_MAX_TIMEOUT}). Set higher when installing packages or running long tasks.",
},
},
"required": ["command"],
}
async def execute(self, params: dict, ctx: ToolContext | None = None) -> ToolResult:
command = params["command"].strip()
working_dir = params.get("working_dir") or None
raw_timeout = params.get("timeout")
if raw_timeout is not None:
timeout = max(1, min(int(raw_timeout), _MAX_TIMEOUT))
else:
timeout = _DEFAULT_TIMEOUT
if not command:
return ToolResult(success=False, output="Empty command.", error="empty_command")
role = ctx.user_role if ctx else current_user_role.get()
user_id = ctx.user_id if ctx else current_user_id.get(None)
# Admins and single-user / legacy mode: use the existing restriction logic
if not user_id or role == "admin":
unrestricted = settings.terminal_allowed_commands.strip() == "*"
if unrestricted:
return await self._run_shell(command, working_dir, timeout)
else:
return await self._run_restricted(command, working_dir, timeout)
# Non-admin multi-user mode: sandbox + curated allowlist + dangerous-pattern block
cwd = _resolve_working_dir(working_dir, user_id, role)
if cwd is None and working_dir:
return ToolResult(
success=False,
output="Working directory is outside your sandbox.",
error="sandbox_violation",
)
danger = _check_dangerous(command)
if danger:
return ToolResult(
success=False,
output=f"Blocked: {danger}",
error="dangerous_command",
)
return await self._run_user_restricted(command, cwd, timeout)
async def _run_shell(self, command: str, cwd: str | None, timeout: int) -> ToolResult:
"""Run via shell — supports pipes, redirects, etc."""
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")
output_parts = []
if stdout:
output_parts.append(stdout.decode(errors="replace"))
if stderr:
output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")
combined = "\n".join(output_parts) or "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
success = proc.returncode == 0
return ToolResult(
success=success,
output=combined,
metadata={"returncode": proc.returncode},
error=None if success else f"Exit code {proc.returncode}",
)
except Exception as e:
return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))
async def _run_restricted(self, command: str, cwd: str | None, timeout: int) -> ToolResult:
"""Run with allowlist check — exec directly, no shell features."""
try:
tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
if not tokens:
return ToolResult(success=False, output="Empty command.", error="empty_command")
allowed = settings.terminal_allowed_commands_list
if tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' is not in the allowed list. "
f"Allowed: {allowed}. Set TERMINAL_ALLOWED_COMMANDS=* to allow all.",
error="not_allowed",
)
try:
proc = await asyncio.create_subprocess_exec(
*tokens,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
return ToolResult(success=False, output=f"Timed out after {timeout}s", error="timeout")
output_parts = []
if stdout:
output_parts.append(stdout.decode(errors="replace"))
if stderr:
output_parts.append(f"[stderr]\n{stderr.decode(errors="replace")}")
combined = "\n".join(output_parts) or "(no output)"
if len(combined) > _MAX_OUTPUT_CHARS:
combined = combined[:_MAX_OUTPUT_CHARS] + f"\n…[truncated — {len(combined)} chars total]"
success = proc.returncode == 0
return ToolResult(
success=success,
output=combined,
metadata={"returncode": proc.returncode},
error=None if success else f"Exit code {proc.returncode}",
)
except FileNotFoundError:
return ToolResult(success=False, output=f"Command not found: {tokens[0]}", error="not_found")
except Exception as e:
return ToolResult(success=False, output=f"Execution error: {e}", error=str(e))
async def _run_user_restricted(self, command: str, cwd: Path | None, timeout: int) -> ToolResult:
"""Run for non-admin users: allowlist + shell features + sandbox cwd."""
try:
tokens = shlex.split(command)
except ValueError as e:
return ToolResult(success=False, output=f"Invalid command syntax: {e}", error=str(e))
if not tokens:
return ToolResult(success=False, output="Empty command.", error="empty_command")
allowed = settings.terminal_user_allowed_commands_list
if tokens[0] not in allowed:
return ToolResult(
success=False,
output=f"Command '{tokens[0]}' is not in the allowed list for non-admin users. "
f"Allowed: {allowed}.",
error="not_allowed",
)
# Shell invocation so pipes/redirects work, but we already validated the base command
return await self._run_shell(command, str(cwd) if cwd else None, timeout)