"""Filesystem tool — read/write/list/delete files.
If FS_ALLOWED_PATHS=* (default), any path is accessible.
Otherwise set a comma-separated list of allowed root paths, e.g.:
FS_ALLOWED_PATHS=/home/user,/var/www
"""
import shutil
from pathlib import Path
from navi.config import settings
from .base import Tool, ToolResult
def _check_path(path_str: str) -> Path | None:
"""Return resolved Path if access is allowed, else None.
Called per-request so config changes take effect on restart without
needing module-level state.
"""
try:
p = Path(path_str).expanduser().resolve()
except Exception:
return None
if settings.fs_allowed_paths.strip() == "*":
return p
allowed = [Path(r).expanduser().resolve() for r in settings.fs_allowed_paths_list]
for root in allowed:
try:
p.relative_to(root)
return p
except ValueError:
continue
return None
class FilesystemTool(Tool):
name = "filesystem"
description = (
"Operate on the local filesystem. "
"Actions: read — get file text; write — create/overwrite file (creates parent dirs); "
"list — directory contents; delete — remove file or directory tree; "
"exists — check if path exists. Paths support ~ expansion."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["read", "write", "list", "delete", "exists"],
"description": "Operation to perform",
},
"path": {
"type": "string",
"description": "Absolute or relative file/directory path (~ is expanded)",
},
"content": {
"type": "string",
"description": "Content to write (required for 'write' action)",
},
},
"required": ["action", "path"],
}
async def execute(self, params: dict) -> ToolResult:
action = params["action"]
raw_path = params["path"]
path = _check_path(raw_path)
if path is None:
allowed = settings.fs_allowed_paths
return ToolResult(
success=False,
output=(
f"Access denied: '{raw_path}' is outside allowed paths ({allowed}). "
f"Set FS_ALLOWED_PATHS=* in .env to allow all paths."
),
error="access_denied",
)
try:
match action:
case "read":
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
content = path.read_text(encoding="utf-8", errors="replace")
return ToolResult(success=True, output=content)
case "write":
content = params.get("content", "")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return ToolResult(success=True, output=f"Written {len(content)} bytes to {path}")
case "list":
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
if path.is_file():
return ToolResult(success=True, output=str(path))
entries = sorted(path.iterdir(), key=lambda e: (e.is_file(), e.name))
lines = [f"{' ' if e.is_file() else 'd '}{e.name}" for e in entries]
return ToolResult(success=True, output="\n".join(lines) or "(empty directory)")
case "delete":
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
return ToolResult(success=True, output=f"Deleted: {path}")
case "exists":
return ToolResult(success=True, output="true" if path.exists() else "false")
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
except PermissionError as e:
return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
except Exception as e:
return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))