diff --git a/navi/core/agent.py b/navi/core/agent.py index 7d7b6c6..3df77af 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -251,10 +251,11 @@ import uuid as _uuid # Give each sub-agent its own scratchpad namespace so parallel or # sequential sub-agents don't clobber each other's working notes. - from navi.tools.base import current_session_id as _sid_var + from navi.tools.base import current_session_id as _sid_var, current_model as _model_var _sid_var.set(f"subagent_{_uuid.uuid4().hex[:12]}") profile = self._profiles.get(profile_id) + _model_var.set(profile.model) exclude = set(exclude_tools or []) tools = [t for t in self._tool_list(profile.enabled_tools) if t.name not in exclude] tool_schemas = [t.schema() for t in tools] @@ -402,9 +403,10 @@ mem = await self._memory_msg() - # Expose session_id to tools (e.g. SSH connection pool) via ContextVar - from navi.tools.base import current_session_id as _sid_var + # Expose session_id and model to tools via ContextVar + from navi.tools.base import current_session_id as _sid_var, current_model as _model_var _sid_token = _sid_var.set(session_id) + _model_var.set(profile.model) # Pre-turn compression: if the last turn filled the context past the # threshold, compress NOW before calling the LLM. This prevents the diff --git a/navi/core/ai_helper.py b/navi/core/ai_helper.py new file mode 100644 index 0000000..b14b735 --- /dev/null +++ b/navi/core/ai_helper.py @@ -0,0 +1,113 @@ +""" +AIHelper — reusable LLM utility for AI-enhanced tools. + +Provides a simple ask() / ask_json() interface over a LLMBackend. +Model selection: reads current_model ContextVar (set by run_stream/run_ephemeral +before each tool turn), falls back to default_model. + +Usage in any tool: + class MyTool(Tool): + def __init__(self, ai_helper: AIHelper) -> None: + self._ai = ai_helper + + async def execute(self, params: dict) -> ToolResult: + answer = await self._ai.ask("You are ...", "Question: ...") + data = await self._ai.ask_json("...", "Return JSON: ...") +""" + +import json +import re +import structlog + +log = structlog.get_logger() + + +class AIHelper: + """ + Thin, reusable wrapper over LLMBackend for single-turn AI calls. + + Parameters + ---------- + backend : LLMBackend + The LLM backend to use (e.g. OllamaBackend). + default_model : str + Fallback model name when current_model ContextVar is not set. + temperature : float + Sampling temperature for all calls (default 0.1 for determinism). + """ + + def __init__(self, backend, default_model: str, temperature: float = 0.1) -> None: + self._backend = backend + self._default_model = default_model + self._temperature = temperature + + def _active_model(self) -> str: + """Return current session model or fall back to default.""" + from navi.tools.base import current_model + return current_model.get() or self._default_model + + async def ask(self, system: str, prompt: str) -> str: + """Single non-streaming LLM call. Returns the response text.""" + from navi.llm.base import Message + messages = [ + Message(role="system", content=system), + Message(role="user", content=prompt), + ] + response = await self._backend.complete( + messages, + tools=None, + temperature=self._temperature, + model=self._active_model(), + think=False, + ) + return (response.content or "").strip() + + async def ask_json(self, system: str, prompt: str) -> list | dict | None: + """ + Single LLM call expecting JSON output. + Returns parsed list/dict, or None if the response cannot be parsed. + Handles markdown code fences automatically. + """ + raw = await self.ask(system, prompt) + result = _extract_json(raw) + if result is None: + log.warning("ai_helper.json_parse_failed", raw_preview=raw[:300]) + return result + + +# ─── JSON extraction ─────────────────────────────────────────────────────── + +def _extract_json(text: str) -> list | dict | None: + """ + Extract the first valid JSON array or object from text. + Handles markdown code fences (```json ... ```) and inline JSON. + Uses bracket-matching to find the outermost structure. + """ + # Strip markdown code fences + cleaned = re.sub(r"```(?:json)?\s*", "", text) + cleaned = re.sub(r"```", "", cleaned).strip() + + # Try direct parse first + try: + return json.loads(cleaned) + except json.JSONDecodeError: + pass + + # Bracket-match: find outermost [ ] or { } + for open_c, close_c in (("[", "]"), ("{", "}")): + start = cleaned.find(open_c) + if start == -1: + continue + depth = 0 + for i, c in enumerate(cleaned[start:], start): + if c == open_c: + depth += 1 + elif c == close_c: + depth -= 1 + if depth == 0: + try: + return json.loads(cleaned[start : i + 1]) + except json.JSONDecodeError: + break + + return None diff --git a/navi/core/registry.py b/navi/core/registry.py index 2e6e308..fee1fed 100644 --- a/navi/core/registry.py +++ b/navi/core/registry.py @@ -105,6 +105,17 @@ session_store=None, ) -> tuple[ToolRegistry, ProfileRegistry, BackendRegistry]: """Build and populate registries with all built-in components.""" + from navi.core.ai_helper import AIHelper + + # Backends are needed by AIHelper — build early + ollama_backend = OllamaBackend( + model=settings.ollama_default_model, + host=settings.ollama_host, + ) + ai_helper = AIHelper( + backend=ollama_backend, + default_model=settings.ollama_default_model, + ) tools = ToolRegistry() reload_tool = ReloadToolsTool(registry=tools) @@ -113,7 +124,7 @@ list_tool = ListToolsTool(registry=tools) manual_tool = ToolManualTool(registry=tools) memory_tool = MemoryTool(memory_store) if memory_store else None - builtins = [WebSearchTool(), FilesystemTool(), HttpRequestTool(), WebViewTool(), + builtins = [WebSearchTool(), FilesystemTool(ai_helper=ai_helper), HttpRequestTool(), WebViewTool(), CodeExecTool(), TerminalTool(), SshExecTool(), ImageViewTool(), ShareFileTool(), TestToolTool(), TodoTool(), ScratchpadTool(), reload_tool, write_tool, delete_tool, list_tool, manual_tool] @@ -151,13 +162,7 @@ tools.register(list_profiles_tool, builtin=True) backends = BackendRegistry() - backends.register( - "ollama", - OllamaBackend( - model=settings.ollama_default_model, - host=settings.ollama_host, - ), - ) + backends.register("ollama", ollama_backend) # Patch backend registry into spawn_tool now that it's available spawn_tool._backend_registry = backends diff --git a/navi/tools/base.py b/navi/tools/base.py index a2b3dc6..9197b49 100644 --- a/navi/tools/base.py +++ b/navi/tools/base.py @@ -27,6 +27,10 @@ # model stays in VRAM). Never use task.cancel() for stopping generation. current_stop_event: ContextVar[asyncio.Event | None] = ContextVar("current_stop_event", default=None) +# Set by run_stream() / run_ephemeral() to expose the current profile's model name +# to tools that need to make their own LLM calls (e.g. AIHelper-powered tools). +current_model: ContextVar[str | None] = ContextVar("current_model", default=None) + @dataclass class ToolResult: diff --git a/navi/tools/filesystem.py b/navi/tools/filesystem.py index 2c516ba..85a7478 100644 --- a/navi/tools/filesystem.py +++ b/navi/tools/filesystem.py @@ -1,10 +1,11 @@ -"""Filesystem tool — read/write/append/list/find/info/move/delete/exists. +"""Filesystem tool — read/write/append/list/find/info/move/delete/exists + AI query/smart_edit. If FS_ALLOWED_PATHS=* (default), any path is accessible. Otherwise set a comma-separated list of allowed root paths, e.g.: FS_ALLOWED_PATHS=/home/user,/var/www """ +import difflib import shutil import stat from datetime import datetime @@ -14,11 +15,57 @@ from .base import Tool, ToolResult -_READ_WARN_BYTES = 100_000 # 100 KB — add size warning in output -_READ_HARD_BYTES = 1_000_000 # 1 MB — refuse full read without offset/limit +_READ_WARN_BYTES = 100_000 # 100 KB — add size warning in output +_READ_HARD_BYTES = 1_000_000 # 1 MB — refuse full read without offset/limit _LIST_MAX_ENTRIES = 500 _FIND_MAX_RESULTS = 200 +# AI actions: ~20k tokens of file content per chunk (4 chars ≈ 1 token) +_AI_CHUNK_CHARS = 80_000 +_AI_OVERLAP_LINES = 30 +# smart_edit: refuse files larger than ~50k tokens (full file must fit in one call) +_AI_EDIT_MAX_CHARS = 200_000 + +# ── System prompts ──────────────────────────────────────────────────────────── + +_QUERY_SINGLE_SYSTEM = ( + "You are a precise file analysis assistant. " + "Answer the question based strictly on the file content shown. " + "Be specific and concise. Include line numbers when relevant." +) + +_QUERY_CHUNK_SYSTEM = ( + "You are analyzing one section of a larger file. " + "Answer the question using only the lines shown. " + "If the answer is not present in this section, respond with exactly: NOT_FOUND\n" + "Otherwise be specific and include line numbers." +) + +_QUERY_SYNTHESIS_SYSTEM = ( + "Combine these partial findings from different sections of a file into one clear answer. " + "Remove duplicates. Be direct and concise." +) + +_EDIT_SYSTEM = ( + "You are a precise file editor. " + "Given file content with line numbers and an instruction, output ONLY a JSON array of edit operations. " + "No explanation, no markdown — just the JSON.\n\n" + "Format:\n" + "[\n" + ' {"op": "replace", "start": LINE, "end": LINE, "content": "new text\\nmore lines"},\n' + ' {"op": "delete", "start": LINE, "end": LINE},\n' + ' {"op": "insert", "after": LINE, "content": "text to insert"}\n' + "]\n\n" + "Rules:\n" + "- Line numbers are 1-based and inclusive\n" + "- Use \\n in content strings for embedded newlines\n" + "- Make MINIMAL changes to accomplish the instruction\n" + "- 'insert' after=0 inserts before the first line\n" + "- If no changes are needed, return []" +) + + +# ── Path helpers ────────────────────────────────────────────────────────────── def _check_path(path_str: str) -> Path | None: """Return resolved Path if access is allowed, else None.""" @@ -41,12 +88,9 @@ def _fmt_size(n: int) -> str: - if n < 1024: - return f"{n} B" - if n < 1024 ** 2: - return f"{n / 1024:.1f} KB" - if n < 1024 ** 3: - return f"{n / 1024 ** 2:.1f} MB" + if n < 1024: return f"{n} B" + if n < 1024 ** 2: return f"{n / 1024:.1f} KB" + if n < 1024 ** 3: return f"{n / 1024 ** 2:.1f} MB" return f"{n / 1024 ** 3:.1f} GB" @@ -54,66 +98,183 @@ return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") +# ── AI helpers (module-level, no self) ──────────────────────────────────────── + +def _number_lines(lines: list[str], start: int = 1) -> str: + """Return file lines with 1-based line numbers, right-aligned.""" + width = len(str(start + len(lines) - 1)) + return "\n".join(f"{start + i:>{width}}: {line}" for i, line in enumerate(lines)) + + +def _make_chunks(lines: list[str], target_chars: int, overlap: int) -> list[tuple[int, int]]: + """ + Split lines into (start_idx, end_idx) chunks of at most target_chars characters. + Consecutive chunks overlap by `overlap` lines to preserve boundary context. + """ + if not lines: + return [(0, 0)] + total = len(lines) + total_chars = sum(len(l) + 1 for l in lines) + if total_chars <= target_chars: + return [(0, total)] + + chunks: list[tuple[int, int]] = [] + start = 0 + while start < total: + chars = 0 + end = start + while end < total and chars < target_chars: + chars += len(lines[end]) + 1 + end += 1 + chunks.append((start, end)) + if end >= total: + break + start = max(start + 1, end - overlap) # always make progress + + return chunks or [(0, total)] + + +def _validate_ops(ops: list, total_lines: int) -> list[str]: + errors: list[str] = [] + for i, op in enumerate(ops): + if not isinstance(op, dict): + errors.append(f"op[{i}] is not a dict"); continue + kind = op.get("op") + if kind not in ("replace", "delete", "insert"): + errors.append(f"op[{i}] unknown type {kind!r}"); continue + if kind in ("replace", "delete"): + s, e = op.get("start"), op.get("end") + if not isinstance(s, int) or not isinstance(e, int): + errors.append(f"op[{i}] start/end must be integers") + elif s < 1 or e > total_lines or s > e: + errors.append(f"op[{i}] range {s}-{e} out of bounds (file has {total_lines} lines)") + elif kind == "insert": + after = op.get("after") + if not isinstance(after, int): + errors.append(f"op[{i}] 'after' must be integer") + elif after < 0 or after > total_lines: + errors.append(f"op[{i}] 'after'={after} out of bounds (0–{total_lines})") + return errors + + +def _apply_ops(lines: list[str], ops: list[dict]) -> list[str]: + """Apply edit operations bottom-up (highest line first) to preserve line numbers.""" + sorted_ops = sorted( + ops, + key=lambda o: o.get("start", o.get("after", 0)), + reverse=True, + ) + result = list(lines) + for op in sorted_ops: + kind = op["op"] + if kind == "replace": + s = op["start"] - 1 # 0-based + e = op["end"] # exclusive (1-based end = exclusive 0-based end) + new = op.get("content", "").split("\n") + result[s:e] = new + elif kind == "delete": + s = op["start"] - 1 + e = op["end"] + del result[s:e] + elif kind == "insert": + after = op["after"] # insert after this 1-based line (0 = before line 1) + new = op.get("content", "").split("\n") + result[after:after] = new + return result + + +def _unified_diff(original: list[str], modified: list[str], path: Path) -> str: + diff = list(difflib.unified_diff( + [l + "\n" for l in original], + [l + "\n" for l in modified], + fromfile=f"a/{path.name}", + tofile=f"b/{path.name}", + lineterm="", + )) + return "\n".join(diff) + + +# ── Tool class ──────────────────────────────────────────────────────────────── + class FilesystemTool(Tool): name = "filesystem" description = ( - "Operate on the local filesystem. " - "Actions: " - "read — get file text; use offset+limit for large files to avoid flooding context; " - "write — create/overwrite file (creates parent dirs); " - "append — add text to end of file (creates if missing); " - "list — directory contents with sizes and dates, optional recursive; " - "find — search files by glob pattern, e.g. '*.py' or '**/*.conf'; " - "info — file metadata: size, line count, modified date, permissions; " - "move — rename or move a file/directory; " - "delete — remove file or directory tree; " - "exists — check if path exists. " - "Tip: call info before reading an unknown file to check its size first." + "Operate on the local filesystem.\n" + "Standard actions: " + "read — file text (offset+limit for large files); " + "write — create/overwrite; " + "append — add to end; " + "list — directory contents; " + "find — glob pattern search; " + "info — size, lines, dates, permissions; " + "move — rename or move; " + "delete — remove file or directory; " + "exists — check if path exists.\n" + "AI actions (require 'question' / 'instruction'): " + "query — ask a natural language question about a file's content, works on any size file; " + "smart_edit — apply a natural language edit instruction precisely, returns a diff of changes.\n" + "Tip: call info before reading an unknown file to check its size." ) parameters = { "type": "object", "properties": { "action": { "type": "string", - "enum": ["read", "write", "append", "list", "find", "info", "move", "delete", "exists"], - "description": "Operation to perform", + "enum": [ + "read", "write", "append", "list", "find", + "info", "move", "delete", "exists", + "query", "smart_edit", + ], + "description": "Operation to perform.", }, "path": { "type": "string", - "description": "Absolute or relative file/directory path (~ is expanded)", + "description": "Absolute or relative file/directory path (~ is expanded).", }, "content": { "type": "string", - "description": "Text to write or append (required for write/append)", + "description": "Text to write or append (required for write/append).", }, "destination": { "type": "string", - "description": "Target path for move action", + "description": "Target path for move action.", }, "pattern": { "type": "string", - "description": "Glob pattern for find action, e.g. '*.log' or '**/*.py'", + "description": "Glob pattern for find, e.g. '*.log' or '**/*.py'.", }, "offset": { "type": "integer", - "description": "First line to read, 1-based (for read action)", + "description": "First line to read, 1-based (for read action).", }, "limit": { "type": "integer", - "description": "Maximum number of lines to return (for read action)", + "description": "Max lines to return (for read action).", }, "recursive": { "type": "boolean", - "description": "List the full directory tree recursively (for list action, default false)", + "description": "Full recursive directory tree (for list, default false).", + }, + "question": { + "type": "string", + "description": "Natural language question about the file's content (for query).", + }, + "instruction": { + "type": "string", + "description": "Natural language edit instruction (for smart_edit), e.g. 'rename function foo to bar'.", }, }, "required": ["action", "path"], } + def __init__(self, ai_helper=None) -> None: + # ai_helper is optional — standard actions work without it + self._ai = ai_helper + async def execute(self, params: dict) -> ToolResult: - action = params["action"] - raw_path = params["path"] - path = _check_path(raw_path) + action = params.get("action", "") + raw_path = params.get("path", "") + path = _check_path(raw_path) if path is None: return ToolResult( @@ -128,24 +289,17 @@ try: match action: - case "read": - return self._read(path, params) - case "write": - return self._write(path, params) - case "append": - return self._append(path, params) - case "list": - return self._list(path, params) - case "find": - return self._find(path, params) - case "info": - return self._info(path) - case "move": - return self._move(path, params) - case "delete": - return self._delete(path) - case "exists": - return ToolResult(success=True, output="true" if path.exists() else "false") + case "read": return self._read(path, params) + case "write": return self._write(path, params) + case "append": return self._append(path, params) + case "list": return self._list(path, params) + case "find": return self._find(path, params) + case "info": return self._info(path) + case "move": return self._move(path, params) + case "delete": return self._delete(path) + case "exists": return ToolResult(success=True, output="true" if path.exists() else "false") + case "query": return await self._query(path, params) + case "smart_edit": return await self._smart_edit(path, params) case _: return ToolResult(success=False, output=f"Unknown action: {action}", error="invalid_action") @@ -154,7 +308,7 @@ except Exception as e: return ToolResult(success=False, output=f"Filesystem error: {e}", error=str(e)) - # ── action handlers ─────────────────────────────────────────────────────── + # ── Standard action handlers ────────────────────────────────────────────── def _read(self, path: Path, params: dict) -> ToolResult: if not path.exists(): @@ -163,28 +317,27 @@ return ToolResult(success=False, output=f"Path is a directory, use 'list': {path}", error="is_directory") file_size = path.stat().st_size - offset = params.get("offset") # 1-based - limit = params.get("limit") + offset = params.get("offset") + limit = params.get("limit") - # Refuse to dump files over 1 MB without an explicit range if file_size > _READ_HARD_BYTES and offset is None and limit is None: return ToolResult( success=False, output=( f"File too large to read in full: {_fmt_size(file_size)} — {path}\n" "Use offset/limit to read specific line ranges " - "(e.g. offset=1, limit=100), or call info first to see the total line count." + "(e.g. offset=1, limit=100), or use 'query' to ask a question about it." ), error="file_too_large", ) - text = path.read_text(encoding="utf-8", errors="replace") + text = path.read_text(encoding="utf-8", errors="replace") lines = text.splitlines(keepends=True) total_lines = len(lines) if offset is not None or limit is not None: - start = max(0, (offset or 1) - 1) # convert 1-based → 0-based - end = (start + limit) if limit is not None else total_lines + start = max(0, (offset or 1) - 1) + end = (start + limit) if limit is not None else total_lines selected = lines[start:end] actual_end = min(end, total_lines) header = ( @@ -194,10 +347,8 @@ return ToolResult(success=True, output=header + "".join(selected)) warn = ( - f"⚠ Large file ({_fmt_size(file_size)}) — " - "consider offset/limit for targeted reads next time.\n" - if file_size > _READ_WARN_BYTES - else "" + f"⚠ Large file ({_fmt_size(file_size)}) — consider offset/limit next time.\n" + if file_size > _READ_WARN_BYTES else "" ) header = f"[{path} | {total_lines} lines | {_fmt_size(file_size)}]\n" return ToolResult(success=True, output=header + warn + text) @@ -207,10 +358,7 @@ path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") lines = len(content.splitlines()) - return ToolResult( - success=True, - output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}", - ) + return ToolResult(success=True, output=f"Written {_fmt_size(len(content.encode()))} ({lines} lines) → {path}") def _append(self, path: Path, params: dict) -> ToolResult: content = params.get("content", "") @@ -219,14 +367,7 @@ path.parent.mkdir(parents=True, exist_ok=True) with path.open("a", encoding="utf-8") as f: f.write(content) - total_size = path.stat().st_size - return ToolResult( - success=True, - output=( - f"Appended {_fmt_size(len(content.encode()))} to {path} " - f"(file now {_fmt_size(total_size)})" - ), - ) + return ToolResult(success=True, output=f"Appended {_fmt_size(len(content.encode()))} to {path} (file now {_fmt_size(path.stat().st_size)})") def _list(self, path: Path, params: dict) -> ToolResult: if not path.exists(): @@ -234,20 +375,18 @@ if path.is_file(): return self._info(path) - recursive = params.get("recursive", False) + recursive = params.get("recursive", False) raw_entries = list(path.rglob("*") if recursive else path.iterdir()) raw_entries.sort(key=lambda e: (e.is_file(), str(e).lower())) truncated = len(raw_entries) > _LIST_MAX_ENTRIES - entries = raw_entries[:_LIST_MAX_ENTRIES] - - lines = [] + entries = raw_entries[:_LIST_MAX_ENTRIES] + lines = [] for e in entries: try: - s = e.stat() + s = e.stat() rel = e.relative_to(path) if e.is_dir(): - # child count only in non-recursive mode (cheap) if not recursive: try: n = sum(1 for _ in e.iterdir()) @@ -257,20 +396,18 @@ else: lines.append(f"d {rel}/") else: - lines.append( - f" {str(rel):<48} {_fmt_size(s.st_size):>10} {_fmt_time(s.st_mtime)}" - ) + lines.append(f" {str(rel):<48} {_fmt_size(s.st_size):>10} {_fmt_time(s.st_mtime)}") except Exception: lines.append(f"? {e.name}") - note = " ⚠ truncated" if truncated else "" + note = " ⚠ truncated" if truncated else "" header = f"[{path} | {len(entries)} entries{note}]\n" return ToolResult(success=True, output=header + ("\n".join(lines) or "(empty directory)")) def _find(self, path: Path, params: dict) -> ToolResult: pattern = params.get("pattern") if not pattern: - return ToolResult(success=False, output="'pattern' is required for find action", error="missing_pattern") + return ToolResult(success=False, output="'pattern' is required for find", error="missing_pattern") if not path.exists(): return ToolResult(success=False, output=f"Path not found: {path}", error="not_found") @@ -295,7 +432,7 @@ except Exception: lines.append(str(m)) - extra = f" ⚠ showing first {_FIND_MAX_RESULTS}, more exist" if len(matches) == _FIND_MAX_RESULTS else "" + extra = f" ⚠ showing first {_FIND_MAX_RESULTS}" if len(matches) == _FIND_MAX_RESULTS else "" header = f"[{len(matches)} matches for '{pattern}' in {path}{extra}]\n" return ToolResult(success=True, output=header + "\n".join(lines)) @@ -303,7 +440,7 @@ if not path.exists(): return ToolResult(success=False, output=f"Not found: {path}", error="not_found") - s = path.stat() + s = path.stat() kind = "symlink" if path.is_symlink() else ("directory" if path.is_dir() else "file") lines = [ f"path: {path}", @@ -322,25 +459,18 @@ elif path.is_dir(): try: children = list(path.iterdir()) - n_dirs = sum(1 for c in children if c.is_dir()) - n_files = sum(1 for c in children if c.is_file()) - lines.append(f"contents: {n_files} files, {n_dirs} dirs (top level)") + lines.append(f"contents: {sum(c.is_file() for c in children)} files, {sum(c.is_dir() for c in children)} dirs (top level)") except Exception: pass - return ToolResult(success=True, output="\n".join(lines)) def _move(self, path: Path, params: dict) -> ToolResult: dest_raw = params.get("destination") if not dest_raw: - return ToolResult(success=False, output="'destination' is required for move action", error="missing_destination") + return ToolResult(success=False, output="'destination' is required for move", error="missing_destination") dest = _check_path(dest_raw) if dest is None: - return ToolResult( - success=False, - output=f"Access denied: destination '{dest_raw}' is outside allowed paths.", - error="access_denied", - ) + return ToolResult(success=False, output=f"Access denied: destination '{dest_raw}' outside allowed paths.", error="access_denied") if not path.exists(): return ToolResult(success=False, output=f"Not found: {path}", error="not_found") dest.parent.mkdir(parents=True, exist_ok=True) @@ -355,3 +485,131 @@ else: path.unlink() return ToolResult(success=True, output=f"Deleted: {path}") + + # ── AI action handlers ──────────────────────────────────────────────────── + + def _require_ai(self) -> ToolResult | None: + if not self._ai: + return ToolResult( + success=False, + output="AI helper is not available for this action.", + error="no_ai_helper", + ) + return None + + async def _query(self, path: Path, params: dict) -> ToolResult: + if (err := self._require_ai()) is not None: + return err + + question = params.get("question", "").strip() + if not question: + return ToolResult(success=False, output="'question' is required for query.", error="missing_question") + if not path.exists(): + return ToolResult(success=False, output=f"File not found: {path}", error="not_found") + if path.is_dir(): + return ToolResult(success=False, output="query works on files, not directories.", error="is_directory") + + text = path.read_text(encoding="utf-8", errors="replace") + lines = text.splitlines() + total = len(lines) + chunks = _make_chunks(lines, _AI_CHUNK_CHARS, _AI_OVERLAP_LINES) + + if len(chunks) == 1: + s, e = chunks[0] + numbered = _number_lines(lines[s:e], s + 1) + answer = await self._ai.ask( + _QUERY_SINGLE_SYSTEM, + f"File: {path}\n\nQuestion: {question}\n\nContent:\n{numbered}", + ) + return ToolResult(success=True, output=answer) + + # Multi-chunk: accumulate partial answers + partials: list[str] = [] + for s, e in chunks: + numbered = _number_lines(lines[s:e], s + 1) + partial = await self._ai.ask( + _QUERY_CHUNK_SYSTEM, + f"File: {path} (lines {s + 1}–{e} of {total})\nQuestion: {question}\n\nContent:\n{numbered}", + ) + if partial and "NOT_FOUND" not in partial.upper(): + partials.append(f"[lines {s + 1}–{e}] {partial}") + + if not partials: + return ToolResult(success=True, output=f"No information found in '{path.name}' relevant to: {question}") + + if len(partials) == 1: + # Single finding — strip range prefix, return directly + answer = partials[0].split("] ", 1)[-1] + return ToolResult(success=True, output=answer) + + answer = await self._ai.ask( + _QUERY_SYNTHESIS_SYSTEM, + f"Question: {question}\n\nFindings from {len(partials)} sections:\n\n" + "\n\n".join(partials), + ) + return ToolResult(success=True, output=answer) + + async def _smart_edit(self, path: Path, params: dict) -> ToolResult: + if (err := self._require_ai()) is not None: + return err + + instruction = params.get("instruction", "").strip() + if not instruction: + return ToolResult(success=False, output="'instruction' is required for smart_edit.", error="missing_instruction") + if not path.exists(): + return ToolResult(success=False, output=f"File not found: {path}", error="not_found") + if path.is_dir(): + return ToolResult(success=False, output="smart_edit works on files, not directories.", error="is_directory") + + text = path.read_text(encoding="utf-8", errors="replace") + if len(text) > _AI_EDIT_MAX_CHARS: + return ToolResult( + success=False, + output=( + f"File too large for smart_edit ({_fmt_size(len(text.encode()))}, " + f"limit {_fmt_size(_AI_EDIT_MAX_CHARS)}). " + "Use read with offset/limit to locate the relevant section, then write it back." + ), + error="file_too_large", + ) + + lines = text.splitlines() + numbered = _number_lines(lines, 1) + + raw_ops = await self._ai.ask_json( + _EDIT_SYSTEM, + f"File: {path}\nTotal lines: {len(lines)}\n\nInstruction: {instruction}\n\nContent:\n{numbered}", + ) + + if raw_ops is None: + return ToolResult( + success=False, + output="AI could not produce valid edit operations. Try rephrasing the instruction.", + error="invalid_ai_response", + ) + + if not isinstance(raw_ops, list): + raw_ops = [raw_ops] if isinstance(raw_ops, dict) else [] + + if not raw_ops: + return ToolResult(success=True, output="No changes needed — file unchanged.") + + errors = _validate_ops(raw_ops, len(lines)) + if errors: + return ToolResult( + success=False, + output="AI returned invalid operations:\n" + "\n".join(f" • {e}" for e in errors), + error="invalid_ops", + ) + + new_lines = _apply_ops(lines, raw_ops) + diff = _unified_diff(lines, new_lines, path) + + # Preserve trailing newline + new_text = "\n".join(new_lines) + ("\n" if text.endswith("\n") else "") + path.write_text(new_text, encoding="utf-8") + + summary = ( + f"Applied {len(raw_ops)} operation(s) to {path.name} " + f"({len(lines)} → {len(new_lines)} lines)." + ) + return ToolResult(success=True, output=f"{summary}\n\n{diff}" if diff else summary)