"""Resolve @path references inside user input.
Supported forms:
@path/to/file.py → file content wrapped in code fence
@dir/ → list of files in directory (recursive if trailing /)
@tests/**/*.py → glob expansion, files only
Size limits apply per-file and in total to avoid flooding the LLM context.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
MAX_FILE_BYTES = 64_000
MAX_TOTAL_BYTES = 128_000
TRUNCATED_NOTICE = "\n... [truncated by Navi Code]"
@dataclass
class ResolvedFile:
"""A file resolved from an @ reference."""
path: Path
display_path: str
content: str
truncated: bool = False
@dataclass
class FileRefResult:
"""Result of resolving @ references in a prompt."""
prompt: str # user-visible prompt (with @ markers replaced by file list)
attachments: list[ResolvedFile] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
total_bytes: int = 0
def is_empty(self) -> bool:
return not self.attachments and not self.errors
def to_message(self) -> str:
"""Build the full message to send to the backend."""
if not self.attachments and not self.errors:
return self.prompt
parts = [self.prompt]
if self.attachments:
parts.append("")
parts.append("--- attached files ---")
for f in self.attachments:
lang = _guess_language(f.path)
label = f"file: {f.display_path}"
if f.truncated:
label += " (truncated)"
parts.append(f"```{lang} {label}")
parts.append(f.content)
parts.append("```")
if self.errors:
parts.append("")
parts.append("--- attachment errors ---")
for err in self.errors:
parts.append(f"- {err}")
return "\n".join(parts)
_ref_pattern = re.compile(r"@((?:[A-Za-z0-9_\-\.~/$*?]|\\\s)+)")
def find_refs(text: str) -> list[str]:
"""Return all @path tokens found in text, in order, without duplicates."""
seen: set[str] = set()
refs: list[str] = OrderedRefs()
for raw in _ref_pattern.findall(text):
# Un-escape backslash-space inside the token.
ref = raw.replace("\\ ", " ")
if ref not in seen:
seen.add(ref)
refs.append(ref)
return refs
class OrderedRefs(list):
"""Stub kept for type clarity; plain list suffices."""
class FileRefResolver:
"""Resolve @ references relative to a base directory."""
def __init__(self, base_dir: Path | str | None = None) -> None:
self.base_dir = Path(base_dir or Path.cwd()).expanduser().resolve()
def resolve(self, text: str) -> FileRefResult:
refs = find_refs(text)
if not refs:
return FileRefResult(prompt=text)
result = FileRefResult(prompt=text)
for ref in refs:
self._resolve_ref(ref, result)
if result.total_bytes >= MAX_TOTAL_BYTES:
result.errors.append("total attachment size limit reached; remaining files skipped")
break
return result
def _resolve_ref(self, ref: str, result: FileRefResult) -> None:
path = self._expand_path(ref)
if path is None:
result.errors.append(f"could not resolve {ref!r}")
return
if path.exists():
if path.is_dir():
files = sorted(_collect_files(path, recursive=ref.endswith("/")))
if not files:
result.errors.append(f"no files found in {ref}")
return
for file_path in files:
self._attach_file(file_path, result, root_dir=path)
if result.total_bytes >= MAX_TOTAL_BYTES:
return
return
if path.is_file():
self._attach_file(path, result)
return
result.errors.append(f"not a file or directory: {ref}")
return
# Non-existent path: try glob expansion if it looks like a pattern.
if _is_glob(ref):
matches = sorted(self.base_dir.glob(ref))
if not matches:
result.errors.append(f"no matches for {ref}")
return
for file_path in matches:
if not file_path.is_file():
continue
self._attach_file(file_path, result)
if result.total_bytes >= MAX_TOTAL_BYTES:
return
return
result.errors.append(f"not found: {ref}")
def _attach_file(self, path: Path, result: FileRefResult, root_dir: Path | None = None) -> None:
display = _relative_or_absolute(path, self.base_dir)
if root_dir is not None:
display = _relative_or_absolute(path, root_dir)
try:
data = path.read_bytes()
except Exception as exc:
result.errors.append(f"failed to read {path}: {exc}")
return
truncated = False
if len(data) > MAX_FILE_BYTES:
data = data[:MAX_FILE_BYTES]
truncated = True
remaining = MAX_TOTAL_BYTES - result.total_bytes
if remaining <= 0:
return
if len(data) > remaining:
data = data[:remaining]
truncated = True
try:
text = data.decode("utf-8", errors="replace")
except Exception as exc:
result.errors.append(f"failed to decode {path}: {exc}")
return
if truncated:
text += TRUNCATED_NOTICE
result.attachments.append(ResolvedFile(path=path, display_path=display, content=text, truncated=truncated))
result.total_bytes += len(data)
def _expand_path(self, ref: str) -> Path | None:
# Strip any trailing slash for expansion, but keep the flag later.
clean = ref.rstrip("/")
if not clean:
return None
if clean.startswith("~"):
return Path(clean).expanduser()
return (self.base_dir / clean).resolve()
def _is_glob(ref: str) -> bool:
"""Return True if ref contains glob metacharacters."""
return "*" in ref or "?" in ref or "[" in ref
def _collect_files(path: Path, recursive: bool = False) -> Iterable[Path]:
"""Yield files inside a directory."""
if recursive:
for p in sorted(path.rglob("*")):
if p.is_file():
yield p
else:
for p in sorted(path.iterdir()):
if p.is_file():
yield p
def _relative_or_absolute(path: Path, base: Path) -> str:
try:
return str(path.relative_to(base))
except ValueError:
return str(path)
def _guess_language(path: Path) -> str:
"""Best-effort language tag for markdown code fence."""
mapping = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".tsx": "tsx",
".jsx": "jsx",
".go": "go",
".rs": "rust",
".c": "c",
".cpp": "cpp",
".h": "c",
".java": "java",
".kt": "kotlin",
".sh": "bash",
".zsh": "bash",
".bash": "bash",
".md": "markdown",
".json": "json",
".yaml": "yaml",
".yml": "yaml",
".toml": "toml",
".html": "html",
".css": "css",
".scss": "scss",
".sql": "sql",
".dockerfile": "dockerfile",
".lock": "text",
".txt": "text",
".env": "bash",
}
return mapping.get(path.suffix.lower(), "text")