Newer
Older
navi-1 / navi / tools / filesystem.py
"""Filesystem tool — read/write/append/list/find/info/move/delete/exists + AI query/smart_edit.

If FS_ALLOWED_PATHS=* (default), any path is accessible.
Otherwise set a comma-separated list of allowed root paths, e.g.:
  FS_ALLOWED_PATHS=/home/user,/var/www
"""

import difflib
import shutil
import stat
from datetime import datetime
from pathlib import Path

from navi.config import settings

from .base import Tool, ToolResult

_READ_WARN_BYTES  = 100_000    # 100 KB — add size warning in output
_READ_HARD_BYTES  = 1_000_000  # 1 MB  — refuse full read without offset/limit
_LIST_MAX_ENTRIES = 500
_FIND_MAX_RESULTS = 200

# AI actions: ~20k tokens of file content per chunk (4 chars ≈ 1 token)
_AI_CHUNK_CHARS    = 80_000
_AI_OVERLAP_LINES  = 30
# smart_edit: refuse files larger than ~50k tokens (full file must fit in one call)
_AI_EDIT_MAX_CHARS = 200_000

# ── System prompts ────────────────────────────────────────────────────────────

_QUERY_SINGLE_SYSTEM = (
    "You are a precise file analysis assistant. "
    "Answer the question based strictly on the file content shown. "
    "Be specific and concise. Include line numbers when relevant."
)

_QUERY_CHUNK_SYSTEM = (
    "You are analyzing one section of a larger file. "
    "Answer the question using only the lines shown. "
    "If the answer is not present in this section, respond with exactly: NOT_FOUND\n"
    "Otherwise be specific and include line numbers."
)

_QUERY_SYNTHESIS_SYSTEM = (
    "Combine these partial findings from different sections of a file into one clear answer. "
    "Remove duplicates. Be direct and concise."
)

_EDIT_SYSTEM = (
    "You are a precise file editor. "
    "Given file content with line numbers and an instruction, output ONLY a JSON array of edit operations. "
    "No explanation, no markdown — just the JSON.\n\n"
    "Format:\n"
    "[\n"
    '  {"op": "replace", "start": LINE, "end": LINE, "content": "new text\\nmore lines"},\n'
    '  {"op": "delete",  "start": LINE, "end": LINE},\n'
    '  {"op": "insert",  "after": LINE, "content": "text to insert"}\n'
    "]\n\n"
    "Rules:\n"
    "- Line numbers are 1-based and inclusive\n"
    "- Use \\n in content strings for embedded newlines\n"
    "- Make MINIMAL changes to accomplish the instruction\n"
    "- 'insert' after=0 inserts before the first line\n"
    "- If no changes are needed, return []"
)


# ── Path helpers ──────────────────────────────────────────────────────────────

def _check_path(path_str: str) -> Path | None:
    """Return resolved Path if access is allowed, else None."""
    try:
        p = Path(path_str).expanduser().resolve()
    except Exception:
        return None

    if settings.fs_allowed_paths.strip() == "*":
        return p

    allowed = [Path(r).expanduser().resolve() for r in settings.fs_allowed_paths_list]
    for root in allowed:
        try:
            p.relative_to(root)
            return p
        except ValueError:
            continue
    return None


def _fmt_size(n: int) -> str:
    if n < 1024:        return f"{n} B"
    if n < 1024 ** 2:   return f"{n / 1024:.1f} KB"
    if n < 1024 ** 3:   return f"{n / 1024 ** 2:.1f} MB"
    return f"{n / 1024 ** 3:.1f} GB"


def _fmt_time(ts: float) -> str:
    return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M")


# ── AI helpers (module-level, no self) ────────────────────────────────────────

def _number_lines(lines: list[str], start: int = 1) -> str:
    """Return file lines with 1-based line numbers, right-aligned."""
    width = len(str(start + len(lines) - 1))
    return "\n".join(f"{start + i:>{width}}: {line}" for i, line in enumerate(lines))


def _make_chunks(lines: list[str], target_chars: int, overlap: int) -> list[tuple[int, int]]:
    """
    Split lines into (start_idx, end_idx) chunks of at most target_chars characters.
    Consecutive chunks overlap by `overlap` lines to preserve boundary context.
    """
    if not lines:
        return [(0, 0)]
    total = len(lines)
    total_chars = sum(len(l) + 1 for l in lines)
    if total_chars <= target_chars:
        return [(0, total)]

    chunks: list[tuple[int, int]] = []
    start = 0
    while start < total:
        chars = 0
        end = start
        while end < total and chars < target_chars:
            chars += len(lines[end]) + 1
            end += 1
        chunks.append((start, end))
        if end >= total:
            break
        start = max(start + 1, end - overlap)  # always make progress

    return chunks or [(0, total)]


def _validate_ops(ops: list, total_lines: int) -> list[str]:
    errors: list[str] = []
    for i, op in enumerate(ops):
        if not isinstance(op, dict):
            errors.append(f"op[{i}] is not a dict"); continue
        kind = op.get("op")
        if kind not in ("replace", "delete", "insert"):
            errors.append(f"op[{i}] unknown type {kind!r}"); continue
        if kind in ("replace", "delete"):
            s, e = op.get("start"), op.get("end")
            if not isinstance(s, int) or not isinstance(e, int):
                errors.append(f"op[{i}] start/end must be integers")
            elif s < 1 or e > total_lines or s > e:
                errors.append(f"op[{i}] range {s}-{e} out of bounds (file has {total_lines} lines)")
        elif kind == "insert":
            after = op.get("after")
            if not isinstance(after, int):
                errors.append(f"op[{i}] 'after' must be integer")
            elif after < 0 or after > total_lines:
                errors.append(f"op[{i}] 'after'={after} out of bounds (0–{total_lines})")
    return errors


def _apply_ops(lines: list[str], ops: list[dict]) -> list[str]:
    """Apply edit operations bottom-up (highest line first) to preserve line numbers."""
    sorted_ops = sorted(
        ops,
        key=lambda o: o.get("start", o.get("after", 0)),
        reverse=True,
    )
    result = list(lines)
    for op in sorted_ops:
        kind = op["op"]
        if kind == "replace":
            s = op["start"] - 1          # 0-based
            e = op["end"]                # exclusive (1-based end = exclusive 0-based end)
            new = op.get("content", "").split("\n")
            result[s:e] = new
        elif kind == "delete":
            s = op["start"] - 1
            e = op["end"]
            del result[s:e]
        elif kind == "insert":
            after = op["after"]          # insert after this 1-based line (0 = before line 1)
            new = op.get("content", "").split("\n")
            result[after:after] = new
    return result


def _unified_diff(original: list[str], modified: list[str], path: Path) -> str:
    diff = list(difflib.unified_diff(
        [l + "\n" for l in original],
        [l + "\n" for l in modified],
        fromfile=f"a/{path.name}",
        tofile=f"b/{path.name}",
        lineterm="",
    ))
    return "\n".join(diff)


# ── Tool class ────────────────────────────────────────────────────────────────

class FilesystemTool(Tool):
    name = "filesystem"
    description = (
        "Operate on the local filesystem. "
        "ALWAYS prefer AI actions over manual read+write — they produce more accurate results "
        "and handle files of any size automatically:\n"
        "  • query — use INSTEAD of read when you need to extract or look up information. "
        "Examples: 'what arguments does function X take?', 'on which line is class Y defined?', "
        "'does this config contain key Z?', 'list all TODO comments'. "
        "Pass the question in 'question'.\n"
        "  • smart_edit — use INSTEAD of read+write for any semantic change to a file. "
        "Examples: 'rename function foo to bar', 'add a docstring to method X', "
        "'remove all commented-out code', 'change timeout from 30 to 60'. "
        "Pass the instruction in 'instruction'. Returns a diff of what changed.\n"
        "Standard actions (use only when AI actions are not applicable): "
        "read — raw file text (offset+limit for large files); "
        "write — create or overwrite a file; "
        "append — add text to end; "
        "list — directory contents with sizes; "
        "find — search files by glob pattern downward; "
        "find_up — walk up the directory tree to find a file by name (pattern param); returns its path or 'not found'; "
        "info — size, line count, dates, permissions; "
        "move — rename or move; "
        "delete — remove file or directory tree; "
        "exists — check if path exists. "
        "Tip: call info before reading an unknown file to check its size first."
    )
    parameters = {
        "type": "object",
        "properties": {
            "action": {
                "type": "string",
                "enum": [
                    "read", "write", "append", "list", "find", "find_up",
                    "info", "move", "delete", "exists",
                    "query", "smart_edit",
                ],
                "description": "Operation to perform.",
            },
            "path": {
                "type": "string",
                "description": "Absolute or relative file/directory path (~ is expanded).",
            },
            "content": {
                "type": "string",
                "description": "Text to write or append (required for write/append).",
            },
            "destination": {
                "type": "string",
                "description": "Target path for move action.",
            },
            "pattern": {
                "type": "string",
                "description": "Glob pattern for find (e.g. '*.log'), or exact filename for find_up.",
            },
            "offset": {
                "type": "integer",
                "description": "First line to read, 1-based (for read action).",
            },
            "limit": {
                "type": "integer",
                "description": "Max lines to return (for read action).",
            },
            "recursive": {
                "type": "boolean",
                "description": "Full recursive directory tree (for list, default false).",
            },
            "question": {
                "type": "string",
                "description": (
                    "Natural language question about the file's content (for query). "
                    "Examples: 'What does function calculate() return?', "
                    "'On which line is class UserManager defined?', "
                    "'What environment variables does this script read?', "
                    "'Are there any hardcoded passwords?'"
                ),
            },
            "instruction": {
                "type": "string",
                "description": (
                    "Natural language edit instruction (for smart_edit). "
                    "Examples: 'Rename function process to handle_request', "
                    "'Add type hints to all function signatures', "
                    "'Replace the hardcoded URL with a constant BASE_URL', "
                    "'Delete the block comment on lines 10-20', "
                    "'Add logging to the save() method'"
                ),
            },
        },
        "required": ["action", "path"],
    }

    def __init__(self, ai_helper=None) -> None:
        # ai_helper is optional — standard actions work without it
        self._ai = ai_helper

    async def execute(self, params: dict) -> ToolResult:
        action   = params.get("action", "")
        raw_path = params.get("path", "")
        path     = _check_path(raw_path)

        if path is None:
            return ToolResult(
                success=False,
                output=(
                    f"Access denied: '{raw_path}' is outside allowed paths "
                    f"({settings.fs_allowed_paths}). "
                    "Set FS_ALLOWED_PATHS=* in .env to allow all paths."
                ),
                error="access_denied",
            )

        try:
            match action:
                case "read":      return self._read(path, params)
                case "write":     return self._write(path, params)
                case "append":    return self._append(path, params)
                case "list":      return self._list(path, params)
                case "find":      return self._find(path, params)
                case "find_up":   return self._find_up(path, params)
                case "info":      return self._info(path)
                case "move":      return self._move(path, params)
                case "delete":    return self._delete(path)
                case "exists":    return ToolResult(success=True, output="true" if path.exists() else "false")
                case "query":     return await self._query(path, params)
                case "smart_edit": return await self._smart_edit(path, params)
                case _:
                    return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action")

        except PermissionError as e:
            return ToolResult(success=False, output=f"Permission denied: {e}", error=str(e))
        except Exception as e:
            return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e))

    # ── Standard action handlers ──────────────────────────────────────────────

    def _read(self, path: Path, params: dict) -> ToolResult:
        if not path.exists():
            return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
        if path.is_dir():
            return ToolResult(success=False, output=f"Path is a directory, use 'list': {path}", error="is_directory")

        file_size = path.stat().st_size
        offset = params.get("offset")
        limit  = params.get("limit")

        if file_size > _READ_HARD_BYTES and offset is None and limit is None:
            return ToolResult(
                success=False,
                output=(
                    f"File too large to read in full: {_fmt_size(file_size)} — {path}\n"
                    "Use offset/limit to read specific line ranges "
                    "(e.g. offset=1, limit=100), or use 'query' to ask a question about it."
                ),
                error="file_too_large",
            )

        text  = path.read_text(encoding="utf-8", errors="replace")
        lines = text.splitlines(keepends=True)
        total_lines = len(lines)

        if offset is not None or limit is not None:
            start    = max(0, (offset or 1) - 1)
            end      = (start + limit) if limit is not None else total_lines
            selected = lines[start:end]
            actual_end = min(end, total_lines)
            header = (
                f"[{path}  |  lines {start + 1}–{actual_end} of {total_lines}"
                f"  |  {_fmt_size(file_size)}]\n"
            )
            return ToolResult(success=True, output=header + "".join(selected))

        warn = (
            f"⚠ Large file ({_fmt_size(file_size)}) — consider offset/limit next time.\n"
            if file_size > _READ_WARN_BYTES else ""
        )
        header = f"[{path}  |  {total_lines} lines  |  {_fmt_size(file_size)}]\n"
        return ToolResult(success=True, output=header + warn + text)

    def _write(self, path: Path, params: dict) -> ToolResult:
        content = params.get("content", "")
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content, encoding="utf-8")
        lines = len(content.splitlines())
        return ToolResult(success=True, output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}")

    def _append(self, path: Path, params: dict) -> ToolResult:
        content = params.get("content", "")
        if not content:
            return ToolResult(success=False, output="'content' is required for append", error="missing_content")
        path.parent.mkdir(parents=True, exist_ok=True)
        with path.open("a", encoding="utf-8") as f:
            f.write(content)
        return ToolResult(success=True, output=f"Appended {_fmt_size(len(content.encode()))} to {path} (file now {_fmt_size(path.stat().st_size)})")

    def _list(self, path: Path, params: dict) -> ToolResult:
        if not path.exists():
            return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")
        if path.is_file():
            return self._info(path)

        recursive   = params.get("recursive", False)
        raw_entries = list(path.rglob("*") if recursive else path.iterdir())
        raw_entries.sort(key=lambda e: (e.is_file(), str(e).lower()))

        truncated = len(raw_entries) > _LIST_MAX_ENTRIES
        entries   = raw_entries[:_LIST_MAX_ENTRIES]
        lines     = []
        for e in entries:
            try:
                s   = e.stat()
                rel = e.relative_to(path)
                if e.is_dir():
                    if not recursive:
                        try:
                            n = sum(1 for _ in e.iterdir())
                            lines.append(f"d  {rel}/  ({n} items)")
                        except PermissionError:
                            lines.append(f"d  {rel}/")
                    else:
                        lines.append(f"d  {rel}/")
                else:
                    lines.append(f"   {str(rel):<48} {_fmt_size(s.st_size):>10}  {_fmt_time(s.st_mtime)}")
            except Exception:
                lines.append(f"?  {e.name}")

        note   = "  ⚠ truncated" if truncated else ""
        header = f"[{path}  |  {len(entries)} entries{note}]\n"
        return ToolResult(success=True, output=header + ("\n".join(lines) or "(empty directory)"))

    def _find(self, path: Path, params: dict) -> ToolResult:
        pattern = params.get("pattern")
        if not pattern:
            return ToolResult(success=False, output="'pattern' is required for find", error="missing_pattern")
        if not path.exists():
            return ToolResult(success=False, output=f"Path not found: {path}", error="not_found")

        matches: list[Path] = []
        try:
            for p in path.rglob(pattern):
                matches.append(p)
                if len(matches) >= _FIND_MAX_RESULTS:
                    break
        except Exception as e:
            return ToolResult(success=False, output=f"Find error: {e}", error=str(e))

        if not matches:
            return ToolResult(success=True, output=f"No matches for '{pattern}' in {path}")

        matches.sort()
        lines = []
        for m in matches:
            try:
                size = _fmt_size(m.stat().st_size) if m.is_file() else "<dir>"
                lines.append(f"{m}  ({size})")
            except Exception:
                lines.append(str(m))

        extra  = f"  ⚠ showing first {_FIND_MAX_RESULTS}" if len(matches) == _FIND_MAX_RESULTS else ""
        header = f"[{len(matches)} matches for '{pattern}' in {path}{extra}]\n"
        return ToolResult(success=True, output=header + "\n".join(lines))

    def _find_up(self, path: Path, params: dict) -> ToolResult:
        filename = params.get("pattern", "NAVI.md")
        current = path if path.is_dir() else path.parent
        checked = []
        while True:
            target = current / filename
            checked.append(str(target))
            if target.exists():
                return ToolResult(success=True, output=str(target))
            parent = current.parent
            if parent == current:
                return ToolResult(success=True, output=f"not found (searched: {', '.join(checked)})")
            current = parent

    def _info(self, path: Path) -> ToolResult:
        if not path.exists():
            return ToolResult(success=False, output=f"Not found: {path}", error="not_found")

        s    = path.stat()
        kind = "symlink" if path.is_symlink() else ("directory" if path.is_dir() else "file")
        lines = [
            f"path:     {path}",
            f"type:     {kind}",
            f"size:     {_fmt_size(s.st_size)}",
            f"modified: {_fmt_time(s.st_mtime)}",
            f"created:  {_fmt_time(s.st_ctime)}",
            f"mode:     {stat.filemode(s.st_mode)}",
        ]
        if path.is_file():
            try:
                text = path.read_text(encoding="utf-8", errors="replace")
                lines.append(f"lines:    {len(text.splitlines())}")
            except Exception:
                lines.append("lines:    (binary or unreadable)")
        elif path.is_dir():
            try:
                children = list(path.iterdir())
                lines.append(f"contents: {sum(c.is_file() for c in children)} files, {sum(c.is_dir() for c in children)} dirs (top level)")
            except Exception:
                pass
        return ToolResult(success=True, output="\n".join(lines))

    def _move(self, path: Path, params: dict) -> ToolResult:
        dest_raw = params.get("destination")
        if not dest_raw:
            return ToolResult(success=False, output="'destination' is required for move", error="missing_destination")
        dest = _check_path(dest_raw)
        if dest is None:
            return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied")
        if not path.exists():
            return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
        dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(path), str(dest))
        return ToolResult(success=True, output=f"Moved: {path} → {dest}")

    def _delete(self, path: Path) -> ToolResult:
        if not path.exists():
            return ToolResult(success=False, output=f"Not found: {path}", error="not_found")
        if path.is_dir():
            shutil.rmtree(path)
        else:
            path.unlink()
        return ToolResult(success=True, output=f"Deleted: {path}")

    # ── AI action handlers ────────────────────────────────────────────────────

    def _require_ai(self) -> ToolResult | None:
        if not self._ai:
            return ToolResult(
                success=False,
                output="AI helper is not available for this action.",
                error="no_ai_helper",
            )
        return None

    async def _query(self, path: Path, params: dict) -> ToolResult:
        if (err := self._require_ai()) is not None:
            return err

        question = params.get("question", "").strip()
        if not question:
            return ToolResult(success=False, output="'question' is required for query.", error="missing_question")
        if not path.exists():
            return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
        if path.is_dir():
            return ToolResult(success=False, output="query works on files, not directories.", error="is_directory")

        text   = path.read_text(encoding="utf-8", errors="replace")
        lines  = text.splitlines()
        total  = len(lines)
        chunks = _make_chunks(lines, _AI_CHUNK_CHARS, _AI_OVERLAP_LINES)

        if len(chunks) == 1:
            s, e = chunks[0]
            numbered = _number_lines(lines[s:e], s + 1)
            answer = await self._ai.ask(
                _QUERY_SINGLE_SYSTEM,
                f"File: {path}\n\nQuestion: {question}\n\nContent:\n{numbered}",
            )
            return ToolResult(success=True, output=answer)

        # Multi-chunk: accumulate partial answers
        partials: list[str] = []
        for s, e in chunks:
            numbered = _number_lines(lines[s:e], s + 1)
            partial  = await self._ai.ask(
                _QUERY_CHUNK_SYSTEM,
                f"File: {path}  (lines {s + 1}–{e} of {total})\nQuestion: {question}\n\nContent:\n{numbered}",
            )
            if partial and "NOT_FOUND" not in partial.upper():
                partials.append(f"[lines {s + 1}–{e}] {partial}")

        if not partials:
            return ToolResult(success=True, output=f"No information found in '{path.name}' relevant to: {question}")

        if len(partials) == 1:
            # Single finding — strip range prefix, return directly
            answer = partials[0].split("] ", 1)[-1]
            return ToolResult(success=True, output=answer)

        answer = await self._ai.ask(
            _QUERY_SYNTHESIS_SYSTEM,
            f"Question: {question}\n\nFindings from {len(partials)} sections:\n\n" + "\n\n".join(partials),
        )
        return ToolResult(success=True, output=answer)

    async def _smart_edit(self, path: Path, params: dict) -> ToolResult:
        if (err := self._require_ai()) is not None:
            return err

        instruction = params.get("instruction", "").strip()
        if not instruction:
            return ToolResult(success=False, output="'instruction' is required for smart_edit.", error="missing_instruction")
        if not path.exists():
            return ToolResult(success=False, output=f"File not found: {path}", error="not_found")
        if path.is_dir():
            return ToolResult(success=False, output="smart_edit works on files, not directories.", error="is_directory")

        text = path.read_text(encoding="utf-8", errors="replace")
        if len(text) > _AI_EDIT_MAX_CHARS:
            return ToolResult(
                success=False,
                output=(
                    f"File too large for smart_edit ({_fmt_size(len(text.encode()))}, "
                    f"limit {_fmt_size(_AI_EDIT_MAX_CHARS)}). "
                    "Use read with offset/limit to locate the relevant section, then write it back."
                ),
                error="file_too_large",
            )

        lines    = text.splitlines()
        numbered = _number_lines(lines, 1)

        raw_ops = await self._ai.ask_json(
            _EDIT_SYSTEM,
            f"File: {path}\nTotal lines: {len(lines)}\n\nInstruction: {instruction}\n\nContent:\n{numbered}",
        )

        if raw_ops is None:
            return ToolResult(
                success=False,
                output="AI could not produce valid edit operations. Try rephrasing the instruction.",
                error="invalid_ai_response",
            )

        if not isinstance(raw_ops, list):
            raw_ops = [raw_ops] if isinstance(raw_ops, dict) else []

        if not raw_ops:
            return ToolResult(success=True, output="No changes needed — file unchanged.")

        errors = _validate_ops(raw_ops, len(lines))
        if errors:
            return ToolResult(
                success=False,
                output="AI returned invalid operations:\n" + "\n".join(f"  • {e}" for e in errors),
                error="invalid_ops",
            )

        new_lines = _apply_ops(lines, raw_ops)
        diff      = _unified_diff(lines, new_lines, path)

        # Preserve trailing newline — write atomically to avoid partial writes on failure
        import os
        new_text = "\n".join(new_lines) + ("\n" if text.endswith("\n") else "")
        tmp = path.with_suffix(path.suffix + ".tmp")
        try:
            tmp.write_text(new_text, encoding="utf-8")
            os.replace(tmp, path)
        finally:
            if tmp.exists():
                tmp.unlink(missing_ok=True)

        summary = (
            f"Applied {len(raw_ops)} operation(s) to {path.name} "
            f"({len(lines)} → {len(new_lines)} lines)."
        )
        return ToolResult(success=True, output=f"{summary}\n\n{diff}" if diff else summary)