Newer
Older
gnexus-book / server / app / pending_changes.py
from __future__ import annotations

import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal
from uuid import uuid4

import yaml
from pydantic import BaseModel, Field

from .config import Settings
from .validation import validate_repository


ChangeKind = Literal["doc", "inventory", "inventory-item"]
ChangeStatus = Literal["pending", "applied", "rejected"]


class ProposedChangeRequest(BaseModel):
    kind: ChangeKind
    target: str = Field(min_length=1)
    summary: str = Field(min_length=1, max_length=200)
    reason: str = Field(default="", max_length=2000)
    payload: dict[str, Any]


class PendingChange(BaseModel):
    id: str
    kind: ChangeKind
    target: str
    summary: str
    reason: str
    payload: dict[str, Any]
    status: ChangeStatus
    created_at: str
    updated_at: str


class PendingChangeError(ValueError):
    pass


class PendingChangeRepository:
    def __init__(self, settings: Settings) -> None:
        self.settings = settings
        self.directory = settings.repo_root / "90-maintenance" / "pending-changes"
        self.directory.mkdir(parents=True, exist_ok=True)

    def list(self) -> list[dict[str, Any]]:
        changes = [self._read_file(path) for path in sorted(self.directory.glob("*.json"))]
        return sorted(changes, key=lambda item: item["created_at"], reverse=True)

    def get(self, change_id: str) -> dict[str, Any]:
        path = self._path_for_id(change_id)
        if not path.exists():
            raise PendingChangeError("Pending change not found")
        return self._read_file(path)

    def create(self, request: ProposedChangeRequest) -> dict[str, Any]:
        now = datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        change = PendingChange(
            id=self._new_id(),
            kind=request.kind,
            target=request.target,
            summary=request.summary,
            reason=request.reason,
            payload=request.payload,
            status="pending",
            created_at=now,
            updated_at=now,
        )
        path = self._path_for_id(change.id)
        path.write_text(
            json.dumps(change.model_dump(), indent=2, ensure_ascii=False) + "\n",
            encoding="utf-8",
        )
        return change.model_dump()

    def apply(self, change_id: str) -> dict[str, Any]:
        change = self.get(change_id)
        if change["status"] != "pending":
            raise PendingChangeError("Only pending changes can be applied")

        if change["kind"] == "doc":
            return self._apply_doc_change(change)
        if change["kind"] == "inventory-item":
            return self._apply_inventory_item_change(change)
        raise PendingChangeError("Only doc and inventory-item changes can be applied in the current MVP")

    def _apply_doc_change(self, change: dict[str, Any]) -> dict[str, Any]:
        target = self._resolve_doc_target(change["target"])
        content = self._extract_doc_content(change)
        previous_content = target.read_text(encoding="utf-8") if target.exists() else None

        target.parent.mkdir(parents=True, exist_ok=True)
        target.write_text(content, encoding="utf-8")

        validation = validate_repository(self.settings)
        if validation["status"] != "ok":
            if previous_content is None:
                target.unlink(missing_ok=True)
            else:
                target.write_text(previous_content, encoding="utf-8")
            raise PendingChangeError("Applied change failed validation and was rolled back")

        now = datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        change["status"] = "applied"
        change["updated_at"] = now
        change["applied_at"] = now
        change["validation"] = validation
        self._write_change(change)
        return change

    def _apply_inventory_item_change(self, change: dict[str, Any]) -> dict[str, Any]:
        inventory_type, item_id = self._parse_inventory_item_target(change["target"])
        target = self._resolve_inventory_target(inventory_type)
        previous_content = target.read_text(encoding="utf-8")

        data = yaml.safe_load(previous_content)
        if data is None:
            data = []
        if not isinstance(data, list):
            raise PendingChangeError("Inventory file must contain a list")

        mode = self._extract_inventory_mode(change)
        patch = self._extract_inventory_patch(change)
        found = False
        for item in data:
            if isinstance(item, dict) and item.get("id") == item_id:
                if mode == "create":
                    raise PendingChangeError("Inventory item already exists")
                item.update(patch)
                found = True
                break
        if not found and mode == "update":
            raise PendingChangeError("Inventory item not found")
        if not found and mode == "create":
            data.append({"id": item_id, **patch})

        target.write_text("---\n" + yaml.safe_dump(data, sort_keys=False, allow_unicode=True), encoding="utf-8")

        validation = validate_repository(self.settings)
        if validation["status"] != "ok":
            target.write_text(previous_content, encoding="utf-8")
            raise PendingChangeError("Applied change failed validation and was rolled back")

        now = datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        change["status"] = "applied"
        change["updated_at"] = now
        change["applied_at"] = now
        change["validation"] = validation
        self._write_change(change)
        return change

    def _new_id(self) -> str:
        return datetime.now(UTC).strftime("%Y%m%d%H%M%S") + "-" + uuid4().hex[:8]

    def _path_for_id(self, change_id: str) -> Path:
        if "/" in change_id or "\\" in change_id or change_id.startswith("."):
            raise PendingChangeError("Invalid pending change id")
        return self.directory / f"{change_id}.json"

    def _resolve_doc_target(self, target: str) -> Path:
        if not target.endswith(".md"):
            raise PendingChangeError("Doc change target must be a Markdown file")
        path = (self.settings.repo_root / target).resolve()
        repo_root = self.settings.repo_root.resolve()
        if repo_root not in path.parents and path != repo_root:
            raise PendingChangeError("Doc change target escapes repository root")
        if any(part in {".git", ".venv", "server/.venv", "node_modules"} for part in path.parts):
            raise PendingChangeError("Doc change target is not allowed")
        return path

    def _resolve_inventory_target(self, inventory_type: str) -> Path:
        if "/" in inventory_type or "\\" in inventory_type or inventory_type.startswith("."):
            raise PendingChangeError("Invalid inventory type")
        path = (self.settings.inventory_dir / f"{inventory_type}.yml").resolve()
        inventory_dir = self.settings.inventory_dir.resolve()
        if inventory_dir not in path.parents and path.parent != inventory_dir:
            raise PendingChangeError("Inventory target escapes inventory directory")
        if not path.exists():
            raise PendingChangeError("Inventory type not found")
        return path

    @staticmethod
    def _parse_inventory_item_target(target: str) -> tuple[str, str]:
        parts = target.split("/")
        if len(parts) != 2 or not parts[0] or not parts[1]:
            raise PendingChangeError("Inventory item target must use '<inventory-type>/<id>'")
        return parts[0], parts[1]

    @staticmethod
    def _extract_doc_content(change: dict[str, Any]) -> str:
        payload = change.get("payload")
        if not isinstance(payload, dict):
            raise PendingChangeError("Doc change payload must be an object")
        content = payload.get("content")
        if not isinstance(content, str) or not content.strip():
            raise PendingChangeError("Doc change payload.content must be a non-empty string")
        if not content.endswith("\n"):
            content += "\n"
        return content

    @staticmethod
    def _extract_inventory_patch(change: dict[str, Any]) -> dict[str, Any]:
        payload = change.get("payload")
        if not isinstance(payload, dict):
            raise PendingChangeError("Inventory change payload must be an object")
        patch = payload.get("patch", payload)
        if not isinstance(patch, dict) or not patch:
            raise PendingChangeError("Inventory change patch must be a non-empty object")
        if "id" in patch:
            raise PendingChangeError("Inventory item id cannot be changed")
        return patch

    @staticmethod
    def _extract_inventory_mode(change: dict[str, Any]) -> str:
        payload = change.get("payload")
        if not isinstance(payload, dict):
            raise PendingChangeError("Inventory change payload must be an object")
        mode = payload.get("mode", "update")
        if mode not in {"update", "create"}:
            raise PendingChangeError("Inventory change mode must be 'update' or 'create'")
        return mode

    @staticmethod
    def _read_file(path: Path) -> dict[str, Any]:
        return json.loads(path.read_text(encoding="utf-8"))

    def _write_change(self, change: dict[str, Any]) -> None:
        path = self._path_for_id(str(change["id"]))
        path.write_text(json.dumps(change, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")