"""Filesystem tool — read/write/append/list/find/info/move/delete/exists.
If FS_ALLOWED_PATHS=* (default), any path is accessible.
Otherwise set a comma-separated list of allowed root paths, e.g.:
FS_ALLOWED_PATHS=/home/user,/var/www
"""
import shutil
import stat
from datetime import datetime
from pathlib import Path
from navi.config import settings
from .base import Tool, ToolResult
_READ_WARN_BYTES = 100_000 # 100 KB — add size warning in output
_READ_HARD_BYTES = 1_000_000 # 1 MB — refuse full read without offset/limit
_LIST_MAX_ENTRIES = 500
_FIND_MAX_RESULTS = 200
def _check_path(path_str: str) -> Path | None:
"""Return resolved Path if access is allowed, else None."""
try:
p = Path(path_str).expanduser().resolve()
except Exception:
return None
if settings.fs_allowed_paths.strip() == "*":
return p
allowed = [Path(r).expanduser().resolve() for r in settings.fs_allowed_paths_list]
for root in allowed:
try:
p.relative_to(root)
return p
except ValueError:
continue
return None
def _fmt_size(n: int) -> str:
if n < 1024:
return f"{n} B"
if n < 1024 ** 2:
return f"{n / 1024:.1f} KB"
if n < 1024 ** 3:
return f"{n / 1024 ** 2:.1f} MB"
return f"{n / 1024 ** 3:.1f} GB"
def _fmt_time(ts: float) -> str:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
class FilesystemTool(Tool):
name = "filesystem"
description = (
"Operate on the local filesystem. "
"Actions: "
"read — get file text; use offset+limit for large files to avoid flooding context; "
"write — create/overwrite file (creates parent dirs); "
"append — add text to end of file (creates if missing); "
"list — directory contents with sizes and dates, optional recursive; "
"find — search files by glob pattern, e.g. '*.py' or '**/*.conf'; "
"info — file metadata: size, line count, modified date, permissions; "
"move — rename or move a file/directory; "
"delete — remove file or directory tree; "
"exists — check if path exists. "
"Tip: call info before reading an unknown file to check its size first."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["read", "write", "append", "list", "find", "info", "move", "delete", "exists"],
"description": "Operation to perform",
},
"path": {
"type": "string",
"description": "Absolute or relative file/directory path (~ is expanded)",
},
"content": {
"type": "string",
"description": "Text to write or append (required for write/append)",
},
"destination": {
"type": "string",
"description": "Target path for move action",
},
"pattern": {
"type": "string",
"description": "Glob pattern for find action, e.g. '*.log' or '**/*.py'",
},
"offset": {
"type": "integer",
"description": "First line to read, 1-based (for read action)",
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to return (for read action)",
},
"recursive": {
"type": "boolean",
"description": "List the full directory tree recursively (for list action, default false)",
},
},
"required": ["action", "path"],
}
async def execute(self, params: dict) -> ToolResult:
action = params["action"]
raw_path = params["path"]
path = _check_path(raw_path)
if path is None:
return ToolResult(
success=False,
output=(
f"Access denied: '{raw_path}' is outside allowed paths "
f"({settings.fs_allowed_paths}). "
"Set FS_ALLOWED_PATHS=* in .env to allow all paths."
),
error="access_denied",
)
try:
match action:
case "read":
return self._read(path, params)
case "write":
return self._write(path, params)
case "append":
return self._append(path, params)
case "list":
return self._list(path, params)
case "find":
return self._find(path, params)
case "info":
return self._info(path)
case "move":
return self._move(path, params)
case "delete":
return self._delete(path)
case "exists":
return ToolResult(success=True, output="true" if path.exists() else "false")
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
except PermissionError as e:
return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
except Exception as e:
return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))
# ── action handlers ───────────────────────────────────────────────────────
def _read(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output=f"Path is a directory, use 'list': {path}", error="is_directory")
file_size = path.stat().st_size
offset = params.get("offset") # 1-based
limit = params.get("limit")
# Refuse to dump files over 1 MB without an explicit range
if file_size > _READ_HARD_BYTES and offset is None and limit is None:
return ToolResult(
success=False,
output=(
f"File too large to read in full: {_fmt_size(file_size)} — {path}\n"
"Use offset/limit to read specific line ranges "
"(e.g. offset=1, limit=100), or call info first to see the total line count."
),
error="file_too_large",
)
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines(keepends=True)
total_lines = len(lines)
if offset is not None or limit is not None:
start = max(0, (offset or 1) - 1) # convert 1-based → 0-based
end = (start + limit) if limit is not None else total_lines
selected = lines[start:end]
actual_end = min(end, total_lines)
header = (
f"[{path} | lines {start + 1}–{actual_end} of {total_lines}"
f" | {_fmt_size(file_size)}]\n"
)
return ToolResult(success=True, output=header + "".join(selected))
warn = (
f"⚠ Large file ({_fmt_size(file_size)}) — "
"consider offset/limit for targeted reads next time.\n"
if file_size > _READ_WARN_BYTES
else ""
)
header = f"[{path} | {total_lines} lines | {_fmt_size(file_size)}]\n"
return ToolResult(success=True, output=header + warn + text)
def _write(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
lines = len(content.splitlines())
return ToolResult(
success=True,
output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}",
)
def _append(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
if not content:
return ToolResult(success=False, output="'content' is required for append", error="missing_content")
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(content)
total_size = path.stat().st_size
return ToolResult(
success=True,
output=(
f"Appended {_fmt_size(len(content.encode()))} to {path} "
f"(file now {_fmt_size(total_size)})"
),
)
def _list(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
if path.is_file():
return self._info(path)
recursive = params.get("recursive", False)
raw_entries = list(path.rglob("*") if recursive else path.iterdir())
raw_entries.sort(key=lambda e: (e.is_file(), str(e).lower()))
truncated = len(raw_entries) > _LIST_MAX_ENTRIES
entries = raw_entries[:_LIST_MAX_ENTRIES]
lines = []
for e in entries:
try:
s = e.stat()
rel = e.relative_to(path)
if e.is_dir():
# child count only in non-recursive mode (cheap)
if not recursive:
try:
n = sum(1 for _ in e.iterdir())
lines.append(f"d {rel}/ ({n} items)")
except PermissionError:
lines.append(f"d {rel}/")
else:
lines.append(f"d {rel}/")
else:
lines.append(
f" {str(rel):<48} {_fmt_size(s.st_size):>10} {_fmt_time(s.st_mtime)}"
)
except Exception:
lines.append(f"? {e.name}")
note = " ⚠ truncated" if truncated else ""
header = f"[{path} | {len(entries)} entries{note}]\n"
return ToolResult(success=True, output=header + ("\n".join(lines) or "(empty directory)"))
def _find(self, path: Path, params: dict) -> ToolResult:
pattern = params.get("pattern")
if not pattern:
return ToolResult(success=False, output="'pattern' is required for find action", error="missing_pattern")
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
matches: list[Path] = []
try:
for p in path.rglob(pattern):
matches.append(p)
if len(matches) >= _FIND_MAX_RESULTS:
break
except Exception as e:
return ToolResult(success=False, output=f"Find error: {e}", error=str(e))
if not matches:
return ToolResult(success=True, output=f"No matches for '{pattern}' in {path}")
matches.sort()
lines = []
for m in matches:
try:
size = _fmt_size(m.stat().st_size) if m.is_file() else "<dir>"
lines.append(f"{m} ({size})")
except Exception:
lines.append(str(m))
extra = f" ⚠ showing first {_FIND_MAX_RESULTS}, more exist" if len(matches) == _FIND_MAX_RESULTS else ""
header = f"[{len(matches)} matches for '{pattern}' in {path}{extra}]\n"
return ToolResult(success=True, output=header + "\n".join(lines))
def _info(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
s = path.stat()
kind = "symlink" if path.is_symlink() else ("directory" if path.is_dir() else "file")
lines = [
f"path: {path}",
f"type: {kind}",
f"size: {_fmt_size(s.st_size)}",
f"modified: {_fmt_time(s.st_mtime)}",
f"created: {_fmt_time(s.st_ctime)}",
f"mode: {stat.filemode(s.st_mode)}",
]
if path.is_file():
try:
text = path.read_text(encoding="utf-8", errors="replace")
lines.append(f"lines: {len(text.splitlines())}")
except Exception:
lines.append("lines: (binary or unreadable)")
elif path.is_dir():
try:
children = list(path.iterdir())
n_dirs = sum(1 for c in children if c.is_dir())
n_files = sum(1 for c in children if c.is_file())
lines.append(f"contents: {n_files} files, {n_dirs} dirs (top level)")
except Exception:
pass
return ToolResult(success=True, output="\n".join(lines))
def _move(self, path: Path, params: dict) -> ToolResult:
dest_raw = params.get("destination")
if not dest_raw:
return ToolResult(success=False, output="'destination' is required for move action", error="missing_destination")
dest = _check_path(dest_raw)
if dest is None:
return ToolResult(
success=False,
output=f"Access denied: destination '{dest_raw}' is outside allowed paths.",
error="access_denied",
)
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(path), str(dest))
return ToolResult(success=True, output=f"Moved: {path} → {dest}")
def _delete(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
return ToolResult(success=True, output=f"Deleted: {path}")