"""Permission engine for destructive tool operations."""
from __future__ import annotations
import fnmatch
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
@dataclass
class PermissionRule:
"""A rule that decides whether a tool call needs confirmation."""
tool: str
action: str | None = None # filesystem action, terminal command pattern, etc.
pattern: str | None = None # glob pattern for paths/commands
message: str = ""
# Default destructive patterns.
DEFAULT_RULES: list[PermissionRule] = [
PermissionRule(tool="filesystem", action="delete", message="Delete file/directory"),
PermissionRule(tool="filesystem", action="move", message="Move/overwrite file"),
PermissionRule(tool="filesystem", action="write", message="Overwrite existing file"),
PermissionRule(tool="terminal", pattern="rm *", message="Remove files/directories"),
PermissionRule(tool="terminal", pattern="*format*", message="Format operation"),
PermissionRule(tool="terminal", pattern="*drop*", message="Drop database/table"),
]
PermissionCallback = Callable[[bool], None]
class PermissionEngine:
"""Check whether a tool call requires user confirmation."""
def __init__(self, store_path: Path | None = None, rules: list[PermissionRule] | None = None) -> None:
self._rules = rules or list(DEFAULT_RULES)
self._store_path = store_path or (Path.home() / ".navi_code" / "permissions.json")
self._always_allow: set[str] = set()
self._always_deny: set[str] = set()
self._load()
def _load(self) -> None:
if not self._store_path.exists():
return
try:
data = json.loads(self._store_path.read_text())
self._always_allow = set(data.get("allow", []))
self._always_deny = set(data.get("deny", []))
except Exception:
self._always_allow = set()
self._always_deny = set()
def _save(self) -> None:
self._store_path.parent.mkdir(parents=True, exist_ok=True)
data = {"allow": sorted(self._always_allow), "deny": sorted(self._always_deny)}
self._store_path.write_text(json.dumps(data, indent=2))
def check(self, tool: str, args: dict) -> PermissionRule | None:
"""Return matching rule if confirmation is needed, else None."""
rule_key = self._rule_key(tool, args)
if rule_key in self._always_allow:
return None
if rule_key in self._always_deny:
# Always denied — treat as matched, the caller will reject.
return PermissionRule(tool=tool, message="always denied by user policy")
for rule in self._rules:
if rule.tool != tool:
continue
if rule.action is not None:
if args.get("action") != rule.action:
continue
if rule.pattern is not None:
target = self._extract_target(tool, args)
if target is None or not fnmatch.fnmatch(target, rule.pattern):
continue
return rule
return None
def set_always_allow(self, tool: str, args: dict) -> None:
self._always_allow.add(self._rule_key(tool, args))
self._always_deny.discard(self._rule_key(tool, args))
self._save()
def set_always_deny(self, tool: str, args: dict) -> None:
self._always_deny.add(self._rule_key(tool, args))
self._always_allow.discard(self._rule_key(tool, args))
self._save()
@staticmethod
def _rule_key(tool: str, args: dict) -> str:
action = args.get("action", "")
target = PermissionEngine._extract_target(tool, args)
if target:
return f"{tool}:{action}:{target}"
return f"{tool}:{action}"
@staticmethod
def _extract_target(tool: str, args: dict) -> str:
if tool == "filesystem":
return args.get("path", "") or args.get("destination", "")
if tool == "terminal":
return args.get("command", "") or args.get("action", "")
if tool == "code_exec":
return args.get("language", "") or args.get("code", "")[:40]
if tool == "ssh_exec":
return args.get("host", "") or args.get("command", "")
return ""