Newer
Older
navi-1 / clients / terminal / tui / permissions.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 2 days ago 4 KB Navi Code TUI: complete Phase 4
"""Permission engine for destructive tool operations."""

from __future__ import annotations

import fnmatch
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable


@dataclass
class PermissionRule:
    """A rule that decides whether a tool call needs confirmation."""

    tool: str
    action: str | None = None  # filesystem action, terminal command pattern, etc.
    pattern: str | None = None  # glob pattern for paths/commands
    message: str = ""


# Default destructive patterns.
DEFAULT_RULES: list[PermissionRule] = [
    PermissionRule(tool="filesystem", action="delete", message="Delete file/directory"),
    PermissionRule(tool="filesystem", action="move", message="Move/overwrite file"),
    PermissionRule(tool="filesystem", action="write", message="Overwrite existing file"),
    PermissionRule(tool="terminal", pattern="rm *", message="Remove files/directories"),
    PermissionRule(tool="terminal", pattern="*format*", message="Format operation"),
    PermissionRule(tool="terminal", pattern="*drop*", message="Drop database/table"),
]


PermissionCallback = Callable[[bool], None]


class PermissionEngine:
    """Check whether a tool call requires user confirmation."""

    def __init__(self, store_path: Path | None = None, rules: list[PermissionRule] | None = None) -> None:
        self._rules = rules or list(DEFAULT_RULES)
        self._store_path = store_path or (Path.home() / ".navi_code" / "permissions.json")
        self._always_allow: set[str] = set()
        self._always_deny: set[str] = set()
        self._load()

    def _load(self) -> None:
        if not self._store_path.exists():
            return
        try:
            data = json.loads(self._store_path.read_text())
            self._always_allow = set(data.get("allow", []))
            self._always_deny = set(data.get("deny", []))
        except Exception:
            self._always_allow = set()
            self._always_deny = set()

    def _save(self) -> None:
        self._store_path.parent.mkdir(parents=True, exist_ok=True)
        data = {"allow": sorted(self._always_allow), "deny": sorted(self._always_deny)}
        self._store_path.write_text(json.dumps(data, indent=2))

    def check(self, tool: str, args: dict) -> PermissionRule | None:
        """Return matching rule if confirmation is needed, else None."""
        rule_key = self._rule_key(tool, args)
        if rule_key in self._always_allow:
            return None
        if rule_key in self._always_deny:
            # Always denied — treat as matched, the caller will reject.
            return PermissionRule(tool=tool, message="always denied by user policy")

        for rule in self._rules:
            if rule.tool != tool:
                continue
            if rule.action is not None:
                if args.get("action") != rule.action:
                    continue
            if rule.pattern is not None:
                target = self._extract_target(tool, args)
                if target is None or not fnmatch.fnmatch(target, rule.pattern):
                    continue
            return rule
        return None

    def set_always_allow(self, tool: str, args: dict) -> None:
        self._always_allow.add(self._rule_key(tool, args))
        self._always_deny.discard(self._rule_key(tool, args))
        self._save()

    def set_always_deny(self, tool: str, args: dict) -> None:
        self._always_deny.add(self._rule_key(tool, args))
        self._always_allow.discard(self._rule_key(tool, args))
        self._save()

    @staticmethod
    def _rule_key(tool: str, args: dict) -> str:
        action = args.get("action", "")
        target = PermissionEngine._extract_target(tool, args)
        if target:
            return f"{tool}:{action}:{target}"
        return f"{tool}:{action}"

    @staticmethod
    def _extract_target(tool: str, args: dict) -> str:
        if tool == "filesystem":
            return args.get("path", "") or args.get("destination", "")
        if tool == "terminal":
            return args.get("command", "") or args.get("action", "")
        if tool == "code_exec":
            return args.get("language", "") or args.get("code", "")[:40]
        if tool == "ssh_exec":
            return args.get("host", "") or args.get("command", "")
        return ""