"""Filesystem tool — read/write/append/list/find/info/move/delete/exists + AI query/smart_edit.
If FS_ALLOWED_PATHS=* (default), any path is accessible.
Otherwise set a comma-separated list of allowed root paths, e.g.:
FS_ALLOWED_PATHS=/home/user,/var/www
"""
import difflib
import shutil
import stat
from datetime import datetime
from pathlib import Path
from navi.config import settings
from .base import Tool, ToolResult
_READ_WARN_BYTES = 100_000 # 100 KB — add size warning in output
_READ_HARD_BYTES = 1_000_000 # 1 MB — refuse full read without offset/limit
_LIST_MAX_ENTRIES = 500
_FIND_MAX_RESULTS = 200
# AI actions: ~20k tokens of file content per chunk (4 chars ≈ 1 token)
_AI_CHUNK_CHARS = 80_000
_AI_OVERLAP_LINES = 30
# smart_edit: refuse files larger than ~50k tokens (full file must fit in one call)
_AI_EDIT_MAX_CHARS = 200_000
# ── System prompts ────────────────────────────────────────────────────────────
_QUERY_SINGLE_SYSTEM = (
"You are a precise file analysis assistant. "
"Answer the question based strictly on the file content shown. "
"Be specific and concise. Include line numbers when relevant."
)
_QUERY_CHUNK_SYSTEM = (
"You are analyzing one section of a larger file. "
"Answer the question using only the lines shown. "
"If the answer is not present in this section, respond with exactly: NOT_FOUND\n"
"Otherwise be specific and include line numbers."
)
_QUERY_SYNTHESIS_SYSTEM = (
"Combine these partial findings from different sections of a file into one clear answer. "
"Remove duplicates. Be direct and concise."
)
_EDIT_SYSTEM = (
"You are a precise file editor. "
"Given file content with line numbers and an instruction, output ONLY a JSON array of edit operations. "
"No explanation, no markdown — just the JSON.\n\n"
"Format:\n"
"[\n"
' {"op": "replace", "start": LINE, "end": LINE, "content": "new text\\nmore lines"},\n'
' {"op": "delete", "start": LINE, "end": LINE},\n'
' {"op": "insert", "after": LINE, "content": "text to insert"}\n'
"]\n\n"
"Rules:\n"
"- Line numbers are 1-based and inclusive\n"
"- Use \\n in content strings for embedded newlines\n"
"- Make MINIMAL changes to accomplish the instruction\n"
"- 'insert' after=0 inserts before the first line\n"
"- If no changes are needed, return []"
)
# ── Path helpers ──────────────────────────────────────────────────────────────
def _check_path(path_str: str) -> Path | None:
"""Return resolved Path if access is allowed, else None."""
try:
p = Path(path_str).expanduser().resolve()
except Exception:
return None
if settings.fs_allowed_paths.strip() == "*":
return p
allowed = [Path(r).expanduser().resolve() for r in settings.fs_allowed_paths_list]
for root in allowed:
try:
p.relative_to(root)
return p
except ValueError:
continue
return None
def _fmt_size(n: int) -> str:
if n < 1024: return f"{n} B"
if n < 1024 ** 2: return f"{n / 1024:.1f} KB"
if n < 1024 ** 3: return f"{n / 1024 ** 2:.1f} MB"
return f"{n / 1024 ** 3:.1f} GB"
def _fmt_time(ts: float) -> str:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
# ── AI helpers (module-level, no self) ────────────────────────────────────────
def _number_lines(lines: list[str], start: int = 1) -> str:
"""Return file lines with 1-based line numbers, right-aligned."""
width = len(str(start + len(lines) - 1))
return "\n".join(f"{start + i:>{width}}: {line}" for i, line in enumerate(lines))
def _make_chunks(lines: list[str], target_chars: int, overlap: int) -> list[tuple[int, int]]:
"""
Split lines into (start_idx, end_idx) chunks of at most target_chars characters.
Consecutive chunks overlap by `overlap` lines to preserve boundary context.
"""
if not lines:
return [(0, 0)]
total = len(lines)
total_chars = sum(len(l) + 1 for l in lines)
if total_chars <= target_chars:
return [(0, total)]
chunks: list[tuple[int, int]] = []
start = 0
while start < total:
chars = 0
end = start
while end < total and chars < target_chars:
chars += len(lines[end]) + 1
end += 1
chunks.append((start, end))
if end >= total:
break
start = max(start + 1, end - overlap) # always make progress
return chunks or [(0, total)]
def _validate_ops(ops: list, total_lines: int) -> list[str]:
errors: list[str] = []
for i, op in enumerate(ops):
if not isinstance(op, dict):
errors.append(f"op[{i}] is not a dict"); continue
kind = op.get("op")
if kind not in ("replace", "delete", "insert"):
errors.append(f"op[{i}] unknown type {kind!r}"); continue
if kind in ("replace", "delete"):
s, e = op.get("start"), op.get("end")
if not isinstance(s, int) or not isinstance(e, int):
errors.append(f"op[{i}] start/end must be integers")
elif s < 1 or e > total_lines or s > e:
errors.append(f"op[{i}] range {s}-{e} out of bounds (file has {total_lines} lines)")
elif kind == "insert":
after = op.get("after")
if not isinstance(after, int):
errors.append(f"op[{i}] 'after' must be integer")
elif after < 0 or after > total_lines:
errors.append(f"op[{i}] 'after'={after} out of bounds (0–{total_lines})")
return errors
def _apply_ops(lines: list[str], ops: list[dict]) -> list[str]:
"""Apply edit operations bottom-up (highest line first) to preserve line numbers."""
sorted_ops = sorted(
ops,
key=lambda o: o.get("start", o.get("after", 0)),
reverse=True,
)
result = list(lines)
for op in sorted_ops:
kind = op["op"]
if kind == "replace":
s = op["start"] - 1 # 0-based
e = op["end"] # exclusive (1-based end = exclusive 0-based end)
new = op.get("content", "").split("\n")
result[s:e] = new
elif kind == "delete":
s = op["start"] - 1
e = op["end"]
del result[s:e]
elif kind == "insert":
after = op["after"] # insert after this 1-based line (0 = before line 1)
new = op.get("content", "").split("\n")
result[after:after] = new
return result
def _unified_diff(original: list[str], modified: list[str], path: Path) -> str:
diff = list(difflib.unified_diff(
[l + "\n" for l in original],
[l + "\n" for l in modified],
fromfile=f"a/{path.name}",
tofile=f"b/{path.name}",
lineterm="",
))
return "\n".join(diff)
# ── Tool class ────────────────────────────────────────────────────────────────
class FilesystemTool(Tool):
name = "filesystem"
description = (
"Operate on the local filesystem. "
"ALWAYS prefer AI actions over manual read+write — they produce more accurate results "
"and handle files of any size automatically:\n"
" • query — use INSTEAD of read when you need to extract or look up information. "
"Examples: 'what arguments does function X take?', 'on which line is class Y defined?', "
"'does this config contain key Z?', 'list all TODO comments'. "
"Pass the question in 'question'.\n"
" • smart_edit — use INSTEAD of read+write for any semantic change to a file. "
"Examples: 'rename function foo to bar', 'add a docstring to method X', "
"'remove all commented-out code', 'change timeout from 30 to 60'. "
"Pass the instruction in 'instruction'. Returns a diff of what changed.\n"
"Standard actions (use only when AI actions are not applicable): "
"read — raw file text (offset+limit for large files); "
"write — create or overwrite a file; "
"append — add text to end; "
"list — directory contents with sizes; "
"find — search files by glob pattern downward; "
"find_up — walk up the directory tree to find a file by name (pattern param); returns its path or 'not found'; "
"info — size, line count, dates, permissions; "
"move — rename or move; "
"delete — remove file or directory tree; "
"exists — check if path exists. "
"Tip: call info before reading an unknown file to check its size first."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"read", "write", "append", "list", "find", "find_up",
"info", "move", "delete", "exists",
"query", "smart_edit",
],
"description": "Operation to perform.",
},
"path": {
"type": "string",
"description": "Absolute or relative file/directory path (~ is expanded).",
},
"content": {
"type": "string",
"description": "Text to write or append (required for write/append).",
},
"destination": {
"type": "string",
"description": "Target path for move action.",
},
"pattern": {
"type": "string",
"description": "Glob pattern for find (e.g. '*.log'), or exact filename for find_up.",
},
"offset": {
"type": "integer",
"description": "First line to read, 1-based (for read action).",
},
"limit": {
"type": "integer",
"description": "Max lines to return (for read action).",
},
"recursive": {
"type": "boolean",
"description": "Full recursive directory tree (for list, default false).",
},
"question": {
"type": "string",
"description": (
"Natural language question about the file's content (for query). "
"Examples: 'What does function calculate() return?', "
"'On which line is class UserManager defined?', "
"'What environment variables does this script read?', "
"'Are there any hardcoded passwords?'"
),
},
"instruction": {
"type": "string",
"description": (
"Natural language edit instruction (for smart_edit). "
"Examples: 'Rename function process to handle_request', "
"'Add type hints to all function signatures', "
"'Replace the hardcoded URL with a constant BASE_URL', "
"'Delete the block comment on lines 10-20', "
"'Add logging to the save() method'"
),
},
},
"required": ["action", "path"],
}
def __init__(self, ai_helper=None) -> None:
# ai_helper is optional — standard actions work without it
self._ai = ai_helper
async def execute(self, params: dict) -> ToolResult:
action = params.get("action", "")
raw_path = params.get("path", "")
path = _check_path(raw_path)
if path is None:
return ToolResult(
success=False,
output=(
f"Access denied: '{raw_path}' is outside allowed paths "
f"({settings.fs_allowed_paths}). "
"Set FS_ALLOWED_PATHS=* in .env to allow all paths."
),
error="access_denied",
)
try:
match action:
case "read": return self._read(path, params)
case "write": return self._write(path, params)
case "append": return self._append(path, params)
case "list": return self._list(path, params)
case "find": return self._find(path, params)
case "find_up": return self._find_up(path, params)
case "info": return self._info(path)
case "move": return self._move(path, params)
case "delete": return self._delete(path)
case "exists": return ToolResult(success=True, output="true" if path.exists() else "false")
case "query": return await self._query(path, params)
case "smart_edit": return await self._smart_edit(path, params)
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
except PermissionError as e:
return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
except Exception as e:
return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))
# ── Standard action handlers ──────────────────────────────────────────────
def _read(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output=f"Path is a directory, use 'list': {path}", error="is_directory")
file_size = path.stat().st_size
offset = params.get("offset")
limit = params.get("limit")
if file_size > _READ_HARD_BYTES and offset is None and limit is None:
return ToolResult(
success=False,
output=(
f"File too large to read in full: {_fmt_size(file_size)} — {path}\n"
"Use offset/limit to read specific line ranges "
"(e.g. offset=1, limit=100), or use 'query' to ask a question about it."
),
error="file_too_large",
)
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines(keepends=True)
total_lines = len(lines)
if offset is not None or limit is not None:
start = max(0, (offset or 1) - 1)
end = (start + limit) if limit is not None else total_lines
selected = lines[start:end]
actual_end = min(end, total_lines)
header = (
f"[{path} | lines {start + 1}–{actual_end} of {total_lines}"
f" | {_fmt_size(file_size)}]\n"
)
return ToolResult(success=True, output=header + "".join(selected))
warn = (
f"⚠ Large file ({_fmt_size(file_size)}) — consider offset/limit next time.\n"
if file_size > _READ_WARN_BYTES else ""
)
header = f"[{path} | {total_lines} lines | {_fmt_size(file_size)}]\n"
return ToolResult(success=True, output=header + warn + text)
def _write(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
lines = len(content.splitlines())
return ToolResult(success=True, output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}")
def _append(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
if not content:
return ToolResult(success=False, output="'content' is required for append", error="missing_content")
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(content)
return ToolResult(success=True, output=f"Appended {_fmt_size(len(content.encode()))} to {path} (file now {_fmt_size(path.stat().st_size)})")
def _list(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
if path.is_file():
return self._info(path)
recursive = params.get("recursive", False)
raw_entries = list(path.rglob("*") if recursive else path.iterdir())
raw_entries.sort(key=lambda e: (e.is_file(), str(e).lower()))
truncated = len(raw_entries) > _LIST_MAX_ENTRIES
entries = raw_entries[:_LIST_MAX_ENTRIES]
lines = []
for e in entries:
try:
s = e.stat()
rel = e.relative_to(path)
if e.is_dir():
if not recursive:
try:
n = sum(1 for _ in e.iterdir())
lines.append(f"d {rel}/ ({n} items)")
except PermissionError:
lines.append(f"d {rel}/")
else:
lines.append(f"d {rel}/")
else:
lines.append(f" {str(rel):<48} {_fmt_size(s.st_size):>10} {_fmt_time(s.st_mtime)}")
except Exception:
lines.append(f"? {e.name}")
note = " ⚠ truncated" if truncated else ""
header = f"[{path} | {len(entries)} entries{note}]\n"
return ToolResult(success=True, output=header + ("\n".join(lines) or "(empty directory)"))
def _find(self, path: Path, params: dict) -> ToolResult:
pattern = params.get("pattern")
if not pattern:
return ToolResult(success=False, output="'pattern' is required for find", error="missing_pattern")
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
matches: list[Path] = []
try:
for p in path.rglob(pattern):
matches.append(p)
if len(matches) >= _FIND_MAX_RESULTS:
break
except Exception as e:
return ToolResult(success=False, output=f"Find error: {e}", error=str(e))
if not matches:
return ToolResult(success=True, output=f"No matches for '{pattern}' in {path}")
matches.sort()
lines = []
for m in matches:
try:
size = _fmt_size(m.stat().st_size) if m.is_file() else "<dir>"
lines.append(f"{m} ({size})")
except Exception:
lines.append(str(m))
extra = f" ⚠ showing first {_FIND_MAX_RESULTS}" if len(matches) == _FIND_MAX_RESULTS else ""
header = f"[{len(matches)} matches for '{pattern}' in {path}{extra}]\n"
return ToolResult(success=True, output=header + "\n".join(lines))
def _find_up(self, path: Path, params: dict) -> ToolResult:
filename = params.get("pattern", "NAVI.md")
current = path if path.is_dir() else path.parent
checked = []
while True:
target = current / filename
checked.append(str(target))
if target.exists():
return ToolResult(success=True, output=str(target))
parent = current.parent
if parent == current:
return ToolResult(success=True, output=f"not found (searched: {', '.join(checked)})")
current = parent
def _info(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
s = path.stat()
kind = "symlink" if path.is_symlink() else ("directory" if path.is_dir() else "file")
lines = [
f"path: {path}",
f"type: {kind}",
f"size: {_fmt_size(s.st_size)}",
f"modified: {_fmt_time(s.st_mtime)}",
f"created: {_fmt_time(s.st_ctime)}",
f"mode: {stat.filemode(s.st_mode)}",
]
if path.is_file():
try:
text = path.read_text(encoding="utf-8", errors="replace")
lines.append(f"lines: {len(text.splitlines())}")
except Exception:
lines.append("lines: (binary or unreadable)")
elif path.is_dir():
try:
children = list(path.iterdir())
lines.append(f"contents: {sum(c.is_file() for c in children)} files, {sum(c.is_dir() for c in children)} dirs (top level)")
except Exception:
pass
return ToolResult(success=True, output="\n".join(lines))
def _move(self, path: Path, params: dict) -> ToolResult:
dest_raw = params.get("destination")
if not dest_raw:
return ToolResult(success=False, output="'destination' is required for move", error="missing_destination")
dest = _check_path(dest_raw)
if dest is None:
return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied")
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(path), str(dest))
return ToolResult(success=True, output=f"Moved: {path} → {dest}")
def _delete(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
return ToolResult(success=True, output=f"Deleted: {path}")
# ── AI action handlers ────────────────────────────────────────────────────
def _require_ai(self) -> ToolResult | None:
if not self._ai:
return ToolResult(
success=False,
output="AI helper is not available for this action.",
error="no_ai_helper",
)
return None
async def _query(self, path: Path, params: dict) -> ToolResult:
if (err := self._require_ai()) is not None:
return err
question = params.get("question", "").strip()
if not question:
return ToolResult(success=False, output="'question' is required for query.", error="missing_question")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="query works on files, not directories.", error="is_directory")
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines()
total = len(lines)
chunks = _make_chunks(lines, _AI_CHUNK_CHARS, _AI_OVERLAP_LINES)
if len(chunks) == 1:
s, e = chunks[0]
numbered = _number_lines(lines[s:e], s + 1)
answer = await self._ai.ask(
_QUERY_SINGLE_SYSTEM,
f"File: {path}\n\nQuestion: {question}\n\nContent:\n{numbered}",
)
return ToolResult(success=True, output=answer)
# Multi-chunk: accumulate partial answers
partials: list[str] = []
for s, e in chunks:
numbered = _number_lines(lines[s:e], s + 1)
partial = await self._ai.ask(
_QUERY_CHUNK_SYSTEM,
f"File: {path} (lines {s + 1}–{e} of {total})\nQuestion: {question}\n\nContent:\n{numbered}",
)
if partial and "NOT_FOUND" not in partial.upper():
partials.append(f"[lines {s + 1}–{e}] {partial}")
if not partials:
return ToolResult(success=True, output=f"No information found in '{path.name}' relevant to: {question}")
if len(partials) == 1:
# Single finding — strip range prefix, return directly
answer = partials[0].split("] ", 1)[-1]
return ToolResult(success=True, output=answer)
answer = await self._ai.ask(
_QUERY_SYNTHESIS_SYSTEM,
f"Question: {question}\n\nFindings from {len(partials)} sections:\n\n" + "\n\n".join(partials),
)
return ToolResult(success=True, output=answer)
async def _smart_edit(self, path: Path, params: dict) -> ToolResult:
if (err := self._require_ai()) is not None:
return err
instruction = params.get("instruction", "").strip()
if not instruction:
return ToolResult(success=False, output="'instruction' is required for smart_edit.", error="missing_instruction")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="smart_edit works on files, not directories.", error="is_directory")
text = path.read_text(encoding="utf-8", errors="replace")
if len(text) > _AI_EDIT_MAX_CHARS:
return ToolResult(
success=False,
output=(
f"File too large for smart_edit ({_fmt_size(len(text.encode()))}, "
f"limit {_fmt_size(_AI_EDIT_MAX_CHARS)}). "
"Use read with offset/limit to locate the relevant section, then write it back."
),
error="file_too_large",
)
lines = text.splitlines()
numbered = _number_lines(lines, 1)
raw_ops = await self._ai.ask_json(
_EDIT_SYSTEM,
f"File: {path}\nTotal lines: {len(lines)}\n\nInstruction: {instruction}\n\nContent:\n{numbered}",
)
if raw_ops is None:
return ToolResult(
success=False,
output="AI could not produce valid edit operations. Try rephrasing the instruction.",
error="invalid_ai_response",
)
if not isinstance(raw_ops, list):
raw_ops = [raw_ops] if isinstance(raw_ops, dict) else []
if not raw_ops:
return ToolResult(success=True, output="No changes needed — file unchanged.")
errors = _validate_ops(raw_ops, len(lines))
if errors:
return ToolResult(
success=False,
output="AI returned invalid operations:\n" + "\n".join(f" • {e}" for e in errors),
error="invalid_ops",
)
new_lines = _apply_ops(lines, raw_ops)
diff = _unified_diff(lines, new_lines, path)
# Preserve trailing newline — write atomically to avoid partial writes on failure
import os
new_text = "\n".join(new_lines) + ("\n" if text.endswith("\n") else "")
tmp = path.with_suffix(path.suffix + ".tmp")
try:
tmp.write_text(new_text, encoding="utf-8")
os.replace(tmp, path)
finally:
if tmp.exists():
tmp.unlink(missing_ok=True)
summary = (
f"Applied {len(raw_ops)} operation(s) to {path.name} "
f"({len(lines)} → {len(new_lines)} lines)."
)
return ToolResult(success=True, output=f"{summary}\n\n{diff}" if diff else summary)