from __future__ import annotations
import json
from datetime import date
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from jsonschema.exceptions import SchemaError
from .config import Settings
from .docs_repository import DocsRepository
from .freshness import REQUIRED_FRONTMATTER
SCHEMA_BY_INVENTORY = {
"backups": "backup.schema.json",
"databases": "database.schema.json",
"domains": "domain.schema.json",
"hardware": "hardware.schema.json",
"hosts": "host.schema.json",
"networks": "network.schema.json",
"services": "service.schema.json",
"traffic-routes": "traffic-route.schema.json",
"virtual-machines": "virtual-machine.schema.json",
}
@dataclass(frozen=True)
class ValidationIssue:
path: str
severity: str
code: str
message: str
def validate_repository(settings: Settings) -> dict[str, object]:
issues: list[ValidationIssue] = []
issues.extend(_validate_schema_json(settings.repo_root / "schemas"))
issues.extend(_validate_markdown_frontmatter(settings))
issues.extend(_validate_inventory(settings))
issues.extend(_validate_inventory_doc_links(settings.repo_root))
serialized = [issue.__dict__ for issue in issues]
return {
"status": "ok" if not serialized else "issues",
"issue_count": len(serialized),
"issues": serialized,
}
def _validate_schema_json(schema_dir: Path) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
for path in sorted(schema_dir.glob("*.json")):
try:
schema = json.loads(path.read_text(encoding="utf-8"))
Draft202012Validator.check_schema(schema)
except json.JSONDecodeError as exc:
issues.append(
ValidationIssue(
path=path.as_posix(),
severity="error",
code="invalid-json-schema",
message=f"{exc.msg} at line {exc.lineno}, column {exc.colno}",
)
)
except SchemaError as exc:
issues.append(
ValidationIssue(
path=path.as_posix(),
severity="error",
code="invalid-json-schema",
message=exc.message,
)
)
return issues
def _validate_markdown_frontmatter(settings: Settings) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
repo = DocsRepository(settings)
for doc in repo.list_docs():
path = str(doc["path"])
if path == "README.md" or path.startswith("server/"):
continue
frontmatter = doc.get("frontmatter")
if not isinstance(frontmatter, dict) or not frontmatter:
issues.append(
ValidationIssue(
path=path,
severity="error",
code="missing-frontmatter",
message="Markdown document is missing frontmatter",
)
)
continue
missing = sorted(REQUIRED_FRONTMATTER - set(frontmatter))
if missing:
issues.append(
ValidationIssue(
path=path,
severity="error",
code="missing-frontmatter-fields",
message="Missing frontmatter fields: " + ", ".join(missing),
)
)
return issues
def _validate_inventory(settings: Settings) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
schema_dir = settings.repo_root / "schemas"
for inventory_type, schema_name in SCHEMA_BY_INVENTORY.items():
inventory_path = settings.inventory_dir / f"{inventory_type}.yml"
schema_path = schema_dir / schema_name
if not inventory_path.exists():
issues.append(
ValidationIssue(
path=inventory_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="missing-inventory-file",
message=f"Inventory file is missing for type {inventory_type}",
)
)
continue
try:
data = yaml.safe_load(inventory_path.read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
issues.append(
ValidationIssue(
path=inventory_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="invalid-yaml",
message=str(exc),
)
)
continue
try:
schema = json.loads(schema_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
issues.append(
ValidationIssue(
path=schema_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="unreadable-schema",
message=str(exc),
)
)
continue
issues.extend(_validate_against_schema(settings.repo_root, inventory_path, data, schema))
return issues
def _validate_against_schema(
repo_root: Path,
inventory_path: Path,
data: Any,
schema: dict[str, Any],
) -> list[ValidationIssue]:
rel_path = inventory_path.relative_to(repo_root).as_posix()
if data is None:
data = []
data = _normalize_yaml_scalars(data)
issues: list[ValidationIssue] = []
validator = Draft202012Validator(schema)
for error in sorted(validator.iter_errors(data), key=lambda item: list(item.path)):
location = _format_json_path(error.path)
issues.append(
ValidationIssue(
path=rel_path,
severity="error",
code="json-schema-validation-error",
message=f"{location}: {error.message}",
)
)
return issues
def _format_json_path(path: Any) -> str:
parts = list(path)
if not parts:
return "$"
formatted = "$"
for part in parts:
if isinstance(part, int):
formatted += f"[{part}]"
else:
formatted += f".{part}"
return formatted
def _normalize_yaml_scalars(value: Any) -> Any:
if isinstance(value, date):
return value.isoformat()
if isinstance(value, list):
return [_normalize_yaml_scalars(item) for item in value]
if isinstance(value, dict):
return {key: _normalize_yaml_scalars(item) for key, item in value.items()}
return value
def _validate_inventory_doc_links(repo_root: Path) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
inventory_dir = repo_root / "40-inventory"
for inventory_file in sorted(inventory_dir.glob("*.yml")):
for line in inventory_file.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped.startswith("docs: "):
continue
rel = stripped.split(": ", 1)[1].strip().strip("\"'")
target = (inventory_file.parent / rel).resolve()
if not target.exists():
issues.append(
ValidationIssue(
path=inventory_file.relative_to(repo_root).as_posix(),
severity="error",
code="missing-doc-link-target",
message=f"docs target does not exist: {rel}",
)
)
return issues