"""Filesystem tool — read/write/append/list/find/info/move/delete/exists + AI query/smart_edit.
If FS_ALLOWED_PATHS=* (default), any path is accessible.
Otherwise set a comma-separated list of allowed root paths, e.g.:
FS_ALLOWED_PATHS=/home/user,/var/www
"""
import asyncio
import difflib
import shutil
import stat
from datetime import datetime
from pathlib import Path
from navi.config import settings
from .base import Tool, ToolResult, current_user_id, current_user_role
_READ_WARN_BYTES = 100_000 # 100 KB — add size warning in output
_READ_HARD_BYTES = 1_000_000 # 1 MB — refuse full read without offset/limit
_LIST_MAX_ENTRIES = 500
_FIND_MAX_RESULTS = 200
# AI actions: ~20k tokens of file content per chunk (4 chars ≈ 1 token)
_AI_CHUNK_CHARS = 80_000
_AI_OVERLAP_LINES = 30
# smart_edit: refuse files larger than ~50k tokens (full file must fit in one call)
_AI_EDIT_MAX_CHARS = 200_000
# ── System prompts ────────────────────────────────────────────────────────────
_QUERY_SINGLE_SYSTEM = (
"You are a precise file analysis assistant. "
"Answer the question based strictly on the file content shown. "
"Be specific and concise. Include line numbers when relevant."
)
_QUERY_CHUNK_SYSTEM = (
"You are analyzing one section of a larger file. "
"Answer the question using only the lines shown. "
"If the answer is not present in this section, respond with exactly: NOT_FOUND\n"
"Otherwise be specific and include line numbers."
)
_QUERY_SYNTHESIS_SYSTEM = (
"Combine these partial findings from different sections of a file into one clear answer. "
"Remove duplicates. Be direct and concise."
)
_EDIT_SYSTEM = (
"You are a precise file editor. "
"Given file content with line numbers and an instruction, output ONLY a JSON array of edit operations. "
"No explanation, no markdown — just the JSON.\n\n"
"Format:\n"
"[\n"
' {"op": "replace", "start": LINE, "end": LINE, "content": "new text\\nmore lines"},\n'
' {"op": "delete", "start": LINE, "end": LINE},\n'
' {"op": "insert", "after": LINE, "content": "text to insert"}\n'
"]\n\n"
"Rules:\n"
"- Line numbers are 1-based and inclusive\n"
"- Use \\n in content strings for embedded newlines\n"
"- Make MINIMAL changes to accomplish the instruction\n"
"- 'insert' after=0 inserts before the first line\n"
"- If no changes are needed, return []"
)
# ── Path helpers ──────────────────────────────────────────────────────────────
def _check_path(path_str: str) -> Path | None:
"""Return resolved Path if access is allowed, else None.
When a user_id is active (multi-user mode), all paths are resolved
inside user_data/<user_id>/. This prevents users from accessing each
other's files or random OS directories.
"""
try:
p = Path(path_str)
except Exception:
return None
user_id = current_user_id.get(None)
role = current_user_role.get()
# Admins bypass sandbox and use FS_ALLOWED_PATHS directly
if user_id and role != "admin":
# Sandbox mode: resolve inside user_data/<user_id>/
sandbox = Path("user_data") / user_id
sandbox = sandbox.expanduser().resolve()
sandbox.mkdir(parents=True, exist_ok=True)
if p.is_absolute():
# Absolute paths must still be inside the sandbox
try:
p.resolve().relative_to(sandbox)
return p.resolve()
except ValueError:
return None
else:
# Relative paths are resolved against the sandbox
return (sandbox / p).resolve()
# Fallback to FS_ALLOWED_PATHS for single-user / legacy mode / admin
try:
p = p.expanduser().resolve()
except Exception:
return None
if settings.fs_allowed_paths.strip() == "*":
return p
allowed = [Path(r).expanduser().resolve() for r in settings.fs_allowed_paths_list]
for root in allowed:
try:
p.relative_to(root)
return p
except ValueError:
continue
return None
def _fmt_size(n: int) -> str:
if n < 1024: return f"{n} B"
if n < 1024 ** 2: return f"{n / 1024:.1f} KB"
if n < 1024 ** 3: return f"{n / 1024 ** 2:.1f} MB"
return f"{n / 1024 ** 3:.1f} GB"
def _fmt_time(ts: float) -> str:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")
# ── AI helpers (module-level, no self) ────────────────────────────────────────
def _number_lines(lines: list[str], start: int = 1) -> str:
"""Return file lines with 1-based line numbers, right-aligned."""
width = len(str(start + len(lines) - 1))
return "\n".join(f"{start + i:>{width}}: {line}" for i, line in enumerate(lines))
def _make_chunks(lines: list[str], target_chars: int, overlap: int) -> list[tuple[int, int]]:
"""
Split lines into (start_idx, end_idx) chunks of at most target_chars characters.
Consecutive chunks overlap by `overlap` lines to preserve boundary context.
"""
if not lines:
return [(0, 0)]
total = len(lines)
total_chars = sum(len(l) + 1 for l in lines)
if total_chars <= target_chars:
return [(0, total)]
chunks: list[tuple[int, int]] = []
start = 0
while start < total:
chars = 0
end = start
while end < total and chars < target_chars:
chars += len(lines[end]) + 1
end += 1
chunks.append((start, end))
if end >= total:
break
start = max(start + 1, end - overlap) # always make progress
return chunks or [(0, total)]
def _validate_ops(ops: list, total_lines: int) -> list[str]:
errors: list[str] = []
for i, op in enumerate(ops):
if not isinstance(op, dict):
errors.append(f"op[{i}] is not a dict"); continue
kind = op.get("op")
if kind not in ("replace", "delete", "insert"):
errors.append(f"op[{i}] unknown type {kind!r}"); continue
if kind in ("replace", "delete"):
s, e = op.get("start"), op.get("end")
if not isinstance(s, int) or not isinstance(e, int):
errors.append(f"op[{i}] start/end must be integers")
elif s < 1 or e > total_lines or s > e:
errors.append(f"op[{i}] range {s}-{e} out of bounds (file has {total_lines} lines)")
elif kind == "insert":
after = op.get("after")
if not isinstance(after, int):
errors.append(f"op[{i}] 'after' must be integer")
elif after < 0 or after > total_lines:
errors.append(f"op[{i}] 'after'={after} out of bounds (0–{total_lines})")
return errors
def _apply_ops(lines: list[str], ops: list[dict]) -> list[str]:
"""Apply edit operations bottom-up (highest line first) to preserve line numbers."""
sorted_ops = sorted(
ops,
key=lambda o: o.get("start", o.get("after", 0)),
reverse=True,
)
result = list(lines)
for op in sorted_ops:
kind = op["op"]
if kind == "replace":
s = op["start"] - 1 # 0-based
e = op["end"] # exclusive (1-based end = exclusive 0-based end)
new = op.get("content", "").split("\n")
result[s:e] = new
elif kind == "delete":
s = op["start"] - 1
e = op["end"]
del result[s:e]
elif kind == "insert":
after = op["after"] # insert after this 1-based line (0 = before line 1)
new = op.get("content", "").split("\n")
result[after:after] = new
return result
def _unified_diff(original: list[str], modified: list[str], path: Path) -> str:
diff = list(difflib.unified_diff(
[l + "\n" for l in original],
[l + "\n" for l in modified],
fromfile=f"a/{path.name}",
tofile=f"b/{path.name}",
lineterm="",
))
return "\n".join(diff)
# ── Tool class ────────────────────────────────────────────────────────────────
class FilesystemTool(Tool):
name = "filesystem"
description = (
"Operate on the local filesystem. "
"ALWAYS prefer AI actions over manual read+write — they produce more accurate results "
"and handle files of any size automatically:\n"
" • query — use INSTEAD of read when you need to extract or look up information. "
"Examples: 'what arguments does function X take?', 'on which line is class Y defined?', "
"'does this config contain key Z?', 'list all TODO comments'. "
"Pass the question in 'question'.\n"
"Four editing modes — choose the cheapest one that fits your situation:\n"
" • write — overwrite the ENTIRE file with new content. Use only for new files or total rewrites.\n"
" • edit — deterministic exact-text replacement. You must know the precise old text (must occur exactly once). "
"Pass 'old' and 'new'. Fastest and safest for small tweaks.\n"
" • edit_lines — deterministic line-based edit. You must know exact line numbers. "
"Pass 'operations' array with replace/delete/insert ops. Fastest for bulk structural changes.\n"
" • smart_edit — AI-powered semantic edit. Use when you know the intent but not the exact replacement. "
"Examples: 'rename function foo to bar', 'add a docstring to method X', "
"'remove all commented-out code', 'change timeout from 30 to 60'. "
"Pass the instruction in 'instruction'. Consumes extra tokens.\n"
"Standard actions (use only when AI actions are not applicable): "
"read — raw file text (offset+limit for large files, numbered=true for line numbers); "
"append — add text to end; "
"list — directory contents with sizes; "
"find — search files by glob pattern downward; "
"find_up — walk up the directory tree to find a file by name (pattern param); returns its path or 'not found'; "
"info — size, line count, dates, permissions; "
"move — rename or move; "
"delete — remove file or directory tree; "
"exists — check if path exists. "
"Tip: call info before reading an unknown file to check its size first."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"read", "write", "append", "edit", "edit_lines", "list", "find", "find_up",
"info", "move", "delete", "exists", "mkdir",
"query", "smart_edit",
],
"description": "Operation to perform.",
},
"path": {
"type": "string",
"description": "Absolute or relative file/directory path (~ is expanded).",
},
"content": {
"type": "string",
"description": "Text to write or append (required for write/append).",
},
"old": {
"type": "string",
"description": "Exact text to replace (required for edit). Must occur exactly once.",
},
"new": {
"type": "string",
"description": "Replacement text for edit.",
},
"operations": {
"type": "array",
"description": (
"JSON array of line-based edit operations (required for edit_lines). "
"Each op is {\"op\": \"replace\"|\"delete\"|\"insert\", \"start\": int, \"end\": int, \"content\": str}. "
"Line numbers are 1-based and inclusive. Use edit_lines for fast deterministic edits "
"when you know the exact lines (e.g. 'change line 15 from X to Y')."
),
"items": {
"type": "object",
"properties": {
"op": {"type": "string", "enum": ["replace", "delete", "insert"]},
"start": {"type": "integer"},
"end": {"type": "integer"},
"after": {"type": "integer"},
"content": {"type": "string"},
},
"required": ["op"],
},
},
"numbered": {
"type": "boolean",
"description": "Include 1-based line numbers in read output (default false).",
},
"destination": {
"type": "string",
"description": "Target path for move action.",
},
"pattern": {
"type": "string",
"description": "Glob pattern for find (e.g. '*.log'), or exact filename for find_up.",
},
"offset": {
"type": "integer",
"description": "First line to read, 1-based (for read action).",
},
"limit": {
"type": "integer",
"description": "Max lines to return (for read action).",
},
"recursive": {
"type": "boolean",
"description": "Full recursive directory tree (for list, default false).",
},
"question": {
"type": "string",
"description": (
"Natural language question about the file's content (for query). "
"Examples: 'What does function calculate() return?', "
"'On which line is class UserManager defined?', "
"'What environment variables does this script read?', "
"'Are there any hardcoded passwords?'"
),
},
"instruction": {
"type": "string",
"description": (
"Natural language edit instruction (for smart_edit). "
"Examples: 'Rename function process to handle_request', "
"'Add type hints to all function signatures', "
"'Replace the hardcoded URL with a constant BASE_URL', "
"'Delete the block comment on lines 10-20', "
"'Add logging to the save() method'"
),
},
},
"required": ["action", "path"],
}
def __init__(self, ai_helper=None) -> None:
# ai_helper is optional — standard actions work without it
self._ai = ai_helper
async def execute(self, params: dict) -> ToolResult:
action = params.get("action", "")
raw_path = params.get("path", "")
path = _check_path(raw_path)
if path is None:
return ToolResult(
success=False,
output=(
f"Access denied: '{raw_path}' is outside allowed paths "
f"({settings.fs_allowed_paths}). "
"Set FS_ALLOWED_PATHS=* in .env to allow all paths."
),
error="access_denied",
)
try:
match action:
case "read": return await asyncio.to_thread(self._read, path, params)
case "write": return await asyncio.to_thread(self._write, path, params)
case "append": return await asyncio.to_thread(self._append, path, params)
case "edit": return await asyncio.to_thread(self._edit, path, params)
case "edit_lines": return await asyncio.to_thread(self._edit_lines, path, params)
case "list": return await asyncio.to_thread(self._list, path, params)
case "find": return await asyncio.to_thread(self._find, path, params)
case "find_up": return await asyncio.to_thread(self._find_up, path, params)
case "info": return await asyncio.to_thread(self._info, path)
case "move": return await asyncio.to_thread(self._move, path, params)
case "delete": return await asyncio.to_thread(self._delete, path)
case "exists": return ToolResult(success=True, output="true" if path.exists() else "false")
case "mkdir": return await asyncio.to_thread(self._mkdir, path)
case "query": return await self._query(path, params)
case "smart_edit": return await self._smart_edit(path, params)
case _:
return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")
except PermissionError as e:
return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
except Exception as e:
return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))
# ── Standard action handlers ──────────────────────────────────────────────
def _read(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output=f"Path is a directory, use 'list': {path}", error="is_directory")
file_size = path.stat().st_size
offset = params.get("offset")
limit = params.get("limit")
if file_size > _READ_HARD_BYTES and offset is None and limit is None:
return ToolResult(
success=False,
output=(
f"File too large to read in full: {_fmt_size(file_size)} — {path}\n"
"Use offset/limit to read specific line ranges "
"(e.g. offset=1, limit=100), or use 'query' to ask a question about it."
),
error="file_too_large",
)
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines(keepends=True)
total_lines = len(lines)
if offset is not None or limit is not None:
start = max(0, (offset or 1) - 1)
end = (start + limit) if limit is not None else total_lines
selected = lines[start:end]
actual_end = min(end, total_lines)
header = (
f"[{path} | lines {start + 1}–{actual_end} of {total_lines}"
f" | {_fmt_size(file_size)}]\n"
)
return ToolResult(success=True, output=header + "".join(selected))
warn = (
f"⚠ Large file ({_fmt_size(file_size)}) — consider offset/limit next time.\n"
if file_size > _READ_WARN_BYTES else ""
)
numbered = params.get("numbered", False)
header = f"[{path} | {total_lines} lines | {_fmt_size(file_size)}]\n"
if numbered:
return ToolResult(success=True, output=header + warn + _number_lines(text.splitlines(keepends=False), 1))
return ToolResult(success=True, output=header + warn + text)
def _write(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
lines = len(content.splitlines())
return ToolResult(success=True, output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}")
def _append(self, path: Path, params: dict) -> ToolResult:
content = params.get("content", "")
if not content:
return ToolResult(success=False, output="'content' is required for append", error="missing_content")
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(content)
return ToolResult(success=True, output=f"Appended {_fmt_size(len(content.encode()))} to {path} (file now {_fmt_size(path.stat().st_size)})")
def _edit(self, path: Path, params: dict) -> ToolResult:
old = params.get("old")
new = params.get("new")
if old is None:
return ToolResult(success=False, output="'old' is required for edit", error="missing_old")
if new is None:
return ToolResult(success=False, output="'new' is required for edit", error="missing_new")
if old == "":
return ToolResult(success=False, output="'old' must not be empty", error="empty_old")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="edit works on files, not directories.", error="is_directory")
text = path.read_text(encoding="utf-8", errors="replace")
count = text.count(old)
if count == 0:
return ToolResult(
success=False,
output=(
"Exact text not found. Read the relevant file section and call edit again "
"with text copied exactly from the file."
),
error="old_not_found",
)
if count > 1:
return ToolResult(
success=False,
output=(
f"Exact text occurs {count} times. Use a larger unique old text block "
"or smart_edit for a semantic change."
),
error="old_not_unique",
)
new_text = text.replace(old, new, 1)
path.write_text(new_text, encoding="utf-8")
delta = len(new_text.encode()) - len(text.encode())
return ToolResult(
success=True,
output=(
f"Edited {path}: replaced {len(old.encode())} B with "
f"{len(new.encode())} B (delta {delta:+d} B)."
),
)
def _edit_lines(self, path: Path, params: dict) -> ToolResult:
ops = params.get("operations")
if not ops:
return ToolResult(success=False, output="'operations' is required for edit_lines", error="missing_operations")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="edit_lines works on files, not directories.", error="is_directory")
text = path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines()
errors = _validate_ops(ops, len(lines))
if errors:
return ToolResult(
success=False,
output="Invalid operations:\n" + "\n".join(f" • {e}" for e in errors),
error="invalid_ops",
)
new_lines = _apply_ops(lines, ops)
diff = _unified_diff(lines, new_lines, path)
new_text = "\n".join(new_lines) + ("\n" if text.endswith("\n") else "")
tmp = path.with_suffix(path.suffix + ".tmp")
try:
tmp.write_text(new_text, encoding="utf-8")
tmp.replace(path)
finally:
if tmp.exists():
tmp.unlink(missing_ok=True)
summary = (
f"Applied {len(ops)} operation(s) to {path.name} "
f"({len(lines)} → {len(new_lines)} lines)."
)
return ToolResult(success=True, output=f"{summary}\n\n{diff}" if diff else summary)
def _list(self, path: Path, params: dict) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
if path.is_file():
return self._info(path)
recursive = params.get("recursive", False)
raw_entries = list(path.rglob("*") if recursive else path.iterdir())
raw_entries.sort(key=lambda e: (e.is_file(), str(e).lower()))
truncated = len(raw_entries) > _LIST_MAX_ENTRIES
entries = raw_entries[:_LIST_MAX_ENTRIES]
lines = []
for e in entries:
try:
s = e.stat()
rel = e.relative_to(path)
if e.is_dir():
if not recursive:
try:
n = sum(1 for _ in e.iterdir())
lines.append(f"d {rel}/ ({n} items)")
except PermissionError:
lines.append(f"d {rel}/")
else:
lines.append(f"d {rel}/")
else:
lines.append(f" {str(rel):<48} {_fmt_size(s.st_size):>10} {_fmt_time(s.st_mtime)}")
except Exception:
lines.append(f"? {e.name}")
note = " ⚠ truncated" if truncated else ""
header = f"[{path} | {len(entries)} entries{note}]\n"
return ToolResult(success=True, output=header + ("\n".join(lines) or "(empty directory)"))
def _find(self, path: Path, params: dict) -> ToolResult:
pattern = params.get("pattern")
if not pattern:
return ToolResult(success=False, output="'pattern' is required for find", error="missing_pattern")
if not path.exists():
return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
matches: list[Path] = []
try:
for p in path.rglob(pattern):
matches.append(p)
if len(matches) >= _FIND_MAX_RESULTS:
break
except Exception as e:
return ToolResult(success=False, output=f"Find error: {e}", error=str(e))
if not matches:
return ToolResult(success=True, output=f"No matches for '{pattern}' in {path}")
matches.sort()
lines = []
for m in matches:
try:
size = _fmt_size(m.stat().st_size) if m.is_file() else "<dir>"
lines.append(f"{m} ({size})")
except Exception:
lines.append(str(m))
extra = f" ⚠ showing first {_FIND_MAX_RESULTS}" if len(matches) == _FIND_MAX_RESULTS else ""
header = f"[{len(matches)} matches for '{pattern}' in {path}{extra}]\n"
return ToolResult(success=True, output=header + "\n".join(lines))
def _find_up(self, path: Path, params: dict) -> ToolResult:
filename = params.get("pattern", "NAVI.md")
current = path if path.is_dir() else path.parent
checked = []
while True:
target = current / filename
checked.append(str(target))
if target.exists():
return ToolResult(success=True, output=str(target))
parent = current.parent
if parent == current:
return ToolResult(success=True, output=f"not found (searched: {', '.join(checked)})")
current = parent
def _info(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
s = path.stat()
kind = "symlink" if path.is_symlink() else ("directory" if path.is_dir() else "file")
lines = [
f"path: {path}",
f"type: {kind}",
f"size: {_fmt_size(s.st_size)}",
f"modified: {_fmt_time(s.st_mtime)}",
f"created: {_fmt_time(s.st_ctime)}",
f"mode: {stat.filemode(s.st_mode)}",
]
if path.is_file():
try:
text = path.read_text(encoding="utf-8", errors="replace")
lines.append(f"lines: {len(text.splitlines())}")
except Exception:
lines.append("lines: (binary or unreadable)")
elif path.is_dir():
try:
children = list(path.iterdir())
lines.append(f"contents: {sum(c.is_file() for c in children)} files, {sum(c.is_dir() for c in children)} dirs (top level)")
except Exception:
pass
return ToolResult(success=True, output="\n".join(lines))
def _move(self, path: Path, params: dict) -> ToolResult:
dest_raw = params.get("destination")
if not dest_raw:
return ToolResult(success=False, output="'destination' is required for move", error="missing_destination")
dest = _check_path(dest_raw)
if dest is None:
return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied")
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(path), str(dest))
return ToolResult(success=True, output=f"Moved: {path} → {dest}")
def _mkdir(self, path: Path) -> ToolResult:
path.mkdir(parents=False, exist_ok=True)
return ToolResult(success=True, output=f"Directory created: {path}")
def _delete(self, path: Path) -> ToolResult:
if not path.exists():
return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
return ToolResult(success=True, output=f"Deleted: {path}")
# ── AI action handlers ────────────────────────────────────────────────────
def _require_ai(self) -> ToolResult | None:
if not self._ai:
return ToolResult(
success=False,
output="AI helper is not available for this action.",
error="no_ai_helper",
)
return None
async def _query(self, path: Path, params: dict) -> ToolResult:
if (err := self._require_ai()) is not None:
return err
question = params.get("question", "").strip()
if not question:
return ToolResult(success=False, output="'question' is required for query.", error="missing_question")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="query works on files, not directories.", error="is_directory")
text = await asyncio.to_thread(path.read_text, "utf-8", "replace")
lines = text.splitlines()
total = len(lines)
chunks = _make_chunks(lines, _AI_CHUNK_CHARS, _AI_OVERLAP_LINES)
if len(chunks) == 1:
s, e = chunks[0]
numbered = _number_lines(lines[s:e], s + 1)
answer = await self._ai.ask(
_QUERY_SINGLE_SYSTEM,
f"File: {path}\n\nQuestion: {question}\n\nContent:\n{numbered}",
)
return ToolResult(success=True, output=answer)
# Multi-chunk: accumulate partial answers
partials: list[str] = []
for s, e in chunks:
numbered = _number_lines(lines[s:e], s + 1)
partial = await self._ai.ask(
_QUERY_CHUNK_SYSTEM,
f"File: {path} (lines {s + 1}–{e} of {total})\nQuestion: {question}\n\nContent:\n{numbered}",
)
if partial and "NOT_FOUND" not in partial.upper():
partials.append(f"[lines {s + 1}–{e}] {partial}")
if not partials:
return ToolResult(success=True, output=f"No information found in '{path.name}' relevant to: {question}")
if len(partials) == 1:
# Single finding — strip range prefix, return directly
answer = partials[0].split("] ", 1)[-1]
return ToolResult(success=True, output=answer)
answer = await self._ai.ask(
_QUERY_SYNTHESIS_SYSTEM,
f"Question: {question}\n\nFindings from {len(partials)} sections:\n\n" + "\n\n".join(partials),
)
return ToolResult(success=True, output=answer)
async def _smart_edit(self, path: Path, params: dict) -> ToolResult:
if (err := self._require_ai()) is not None:
return err
instruction = params.get("instruction", "").strip()
if not instruction:
return ToolResult(success=False, output="'instruction' is required for smart_edit.", error="missing_instruction")
if not path.exists():
return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
if path.is_dir():
return ToolResult(success=False, output="smart_edit works on files, not directories.", error="is_directory")
text = await asyncio.to_thread(path.read_text, "utf-8", "replace")
if len(text) > _AI_EDIT_MAX_CHARS:
return ToolResult(
success=False,
output=(
f"File too large for smart_edit ({_fmt_size(len(text.encode()))}, "
f"limit {_fmt_size(_AI_EDIT_MAX_CHARS)}). "
"Use read with offset/limit to locate the relevant section, then write it back."
),
error="file_too_large",
)
lines = text.splitlines()
numbered = _number_lines(lines, 1)
raw_ops = await self._ai.ask_json(
_EDIT_SYSTEM,
f"File: {path}\nTotal lines: {len(lines)}\n\nInstruction: {instruction}\n\nContent:\n{numbered}",
)
if raw_ops is None:
return ToolResult(
success=False,
output="AI could not produce valid edit operations. Try rephrasing the instruction.",
error="invalid_ai_response",
)
if not isinstance(raw_ops, list):
raw_ops = [raw_ops] if isinstance(raw_ops, dict) else []
if not raw_ops:
return ToolResult(success=True, output="No changes needed — file unchanged.")
errors = _validate_ops(raw_ops, len(lines))
if errors:
return ToolResult(
success=False,
output="AI returned invalid operations:\n" + "\n".join(f" • {e}" for e in errors),
error="invalid_ops",
)
new_lines = _apply_ops(lines, raw_ops)
diff = _unified_diff(lines, new_lines, path)
# Preserve trailing newline — write atomically to avoid partial writes on failure
import os
new_text = "\n".join(new_lines) + ("\n" if text.endswith("\n") else "")
tmp = path.with_suffix(path.suffix + ".tmp")
def _atomic_write() -> None:
try:
tmp.write_text(new_text, encoding="utf-8")
os.replace(tmp, path)
finally:
if tmp.exists():
tmp.unlink(missing_ok=True)
await asyncio.to_thread(_atomic_write)
summary = (
f"Applied {len(raw_ops)} operation(s) to {path.name} "
f"({len(lines)} → {len(new_lines)} lines)."
)
return ToolResult(success=True, output=f"{summary}\n\n{diff}" if diff else summary)