"""SSH tool — execute commands on remote hosts via SSH.
Connections are defined in ssh_hosts.json (see .env.example for path config).
The tool also accepts inline host/user/key parameters for one-off connections.
ssh_hosts.json example:
{
"prod": {
"host": "1.2.3.4",
"port": 22,
"username": "root",
"client_keys": ["~/.ssh/id_rsa"],
"known_hosts": null
},
"staging": {
"host": "staging.example.com",
"username": "ubuntu"
}
}
known_hosts:
null — use system ~/.ssh/known_hosts
"none" — skip host key verification (useful for fresh VPS, not recommended for prod)
"""
import asyncio
import json
import os
from pathlib import Path
import asyncssh
from navi.config import settings
from .base import Tool, ToolResult
_TIMEOUT = 60
def _load_hosts() -> dict:
path = Path(settings.ssh_hosts_file).expanduser()
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
class SshExecTool(Tool):
name = "ssh_exec"
description = (
"Execute a command on a remote server via SSH. "
"Credentials (host, username, password or key) can be passed directly as parameters. "
"Optionally use a named connection from ssh_hosts.json as a shortcut."
)
parameters = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Shell command to run on the remote host",
},
"host": {
"type": "string",
"description": "Hostname or IP address of the remote server",
},
"username": {
"type": "string",
"description": "SSH username",
},
"password": {
"type": "string",
"description": "SSH password (if using password authentication)",
},
"port": {
"type": "integer",
"description": "SSH port (default 22)",
},
"key_path": {
"type": "string",
"description": "Path to private key file, e.g. ~/.ssh/id_rsa (if using key authentication)",
},
"connection": {
"type": "string",
"description": "Named connection from ssh_hosts.json — shortcut that provides host/user/creds automatically",
},
"timeout": {
"type": "integer",
"description": f"Timeout in seconds (default {_TIMEOUT})",
},
},
"required": ["command"],
}
async def execute(self, params: dict) -> ToolResult:
command = params["command"].strip()
timeout = int(params.get("timeout") or _TIMEOUT)
connect_kwargs = self._resolve(params)
if connect_kwargs is None:
return ToolResult(
success=False,
output=(
"No SSH target specified. Provide 'host' (and optionally 'username', "
"'password', 'key_path'), or a named 'connection' from ssh_hosts.json."
),
error="no_target",
)
try:
async with asyncssh.connect(**connect_kwargs) as conn:
result = await asyncio.wait_for(
conn.run(command, check=False),
timeout=timeout,
)
output_parts = []
if result.stdout:
output_parts.append(result.stdout)
if result.stderr:
output_parts.append(f"[stderr]\n{result.stderr}")
success = result.exit_status == 0
return ToolResult(
success=success,
output="\n".join(output_parts) or "(no output)",
metadata={"exit_status": result.exit_status, "host": connect_kwargs.get("host")},
error=None if success else f"Exit status {result.exit_status}",
)
except asyncssh.DisconnectError as e:
return ToolResult(success=False, output=f"SSH disconnected: {e}", error=str(e))
except asyncssh.PermissionDenied:
return ToolResult(success=False, output="SSH permission denied. Check username and password/key.", error="permission_denied")
except (TimeoutError, asyncio.TimeoutError):
return ToolResult(success=False, output=f"SSH command timed out after {timeout}s", error="timeout")
except Exception as e:
return ToolResult(success=False, output=f"SSH error: {e}", error=str(e))
def _resolve(self, params: dict) -> dict | None:
# Named connection from ssh_hosts.json takes precedence
connection = params.get("connection", "").strip()
if connection:
hosts = _load_hosts()
if connection in hosts:
cfg = dict(hosts[connection])
# Inline params override stored values
for k in ("host", "username", "password", "port"):
if params.get(k):
cfg[k] = params[k]
if params.get("key_path"):
cfg["client_keys"] = [params["key_path"]]
return self._build_kwargs(cfg)
# Direct params
host = params.get("host", "").strip()
if not host:
return None
cfg: dict = {"host": host}
if params.get("username"):
cfg["username"] = params["username"]
if params.get("password"):
cfg["password"] = params["password"]
if params.get("port"):
cfg["port"] = params["port"]
if params.get("key_path"):
cfg["client_keys"] = [params["key_path"]]
# Skip host key verification by default for ad-hoc connections
cfg.setdefault("known_hosts", "none")
return self._build_kwargs(cfg)
def _build_kwargs(self, cfg: dict) -> dict:
kwargs: dict = {
"host": cfg["host"],
"port": int(cfg.get("port", 22)),
"username": cfg.get("username", os.environ.get("USER", "root")),
}
client_keys = cfg.get("client_keys")
password = cfg.get("password")
if client_keys:
kwargs["client_keys"] = [str(Path(k).expanduser()) for k in client_keys]
if password:
kwargs["password"] = password # fallback
elif password:
kwargs["client_keys"] = [] # disable key lookup, use password only
kwargs["password"] = password
# else: no creds — asyncssh tries ~/.ssh/* by default
known_hosts = cfg.get("known_hosts")
if known_hosts == "none":
kwargs["known_hosts"] = None
elif known_hosts is not None:
kwargs["known_hosts"] = str(Path(known_hosts).expanduser())
return kwargs