"""Filesystem tool — read/write/list files within allowed paths."""
import os
from pathlib import Path
from navi.config import settings
from .base import Tool, ToolResult
_ALLOWED = [Path(p).resolve() for p in settings.fs_allowed_paths_list]
def _check_path(path_str: str) -> Path | None:
"""Return resolved Path if it's within an allowed root, else None."""
try:
p = Path(path_str).resolve()
except Exception:
return None
for allowed in _ALLOWED:
try:
p.relative_to(allowed)
return p
except ValueError:
continue
return None
class FilesystemTool(Tool):
name = "filesystem"
description = (
"Read, write, list, or delete files and directories. "
"Only paths within configured allowed roots are accessible."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["read", "write", "list", "delete", "exists"],
"description": "Operation to perform",
},
"path": {"type": "string", "description": "Absolute or relative file/directory path"},
"content": {
"type": "string",
"description": "Content to write (required for 'write' action)",
},
},
"required": ["action", "path"],
}
async def execute(self, params: dict) -> ToolResult:
action = params["action"]
raw_path = params["path"]
path = _check_path(raw_path)
if path is None:
return ToolResult(
success=False,
output=f"Access denied: '{raw_path}' is outside allowed paths.",
error="access_denied",
)
try:
match action:
case "read":
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
content = path.read_text(encoding="utf-8", errors="replace")
return ToolResult(success=True, output=content)
case "write":
content = params.get("content", "")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return ToolResult(success=True, output=f"Written {len(content)} bytes to {path}")
case "list":
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
if path.is_file():
return ToolResult(success=True, output=str(path))
entries = sorted(path.iterdir(), key=lambda e: (e.is_file(), e.name))
lines = [
f"{' ' if e.is_file() else 'd '}{e.name}" for e in entries
]
return ToolResult(success=True, output="\n".join(lines) or "(empty directory)")
case "delete":
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
if path.is_dir():
import shutil
shutil.rmtree(path)
else:
path.unlink()
return ToolResult(success=True, output=f"Deleted: {path}")
case "exists":
return ToolResult(success=True, output="true" if path.exists() else "false")
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
except PermissionError as e:
return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
except Exception as e:
return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))