from __future__ import annotations
import json
import re
from datetime import date
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
from jsonschema import Draft202012Validator
from jsonschema.exceptions import SchemaError
from .config import Settings
from .docs_repository import DocsRepository, EXCLUDED_PARTS
from .freshness import REQUIRED_FRONTMATTER
SCHEMA_BY_INVENTORY = {
"backups": "backup.schema.json",
"databases": "database.schema.json",
"domains": "domain.schema.json",
"hardware": "hardware.schema.json",
"hosts": "host.schema.json",
"networks": "network.schema.json",
"services": "service.schema.json",
"traffic-routes": "traffic-route.schema.json",
"virtual-machines": "virtual-machine.schema.json",
}
SECRET_ASSIGNMENT_RE = re.compile(
r"""(?ix)
\b(
password|passwd|token|
api[_-]?(key|token)|
access[_-]?token|
auth[_-]?token|
secret|
private[_ -]?key|
session[_ -]?cookie
)\b
\s*[:=]\s*
(?P<value>.+)
"""
)
PRIVATE_KEY_BLOCK_RE = re.compile(r"-----BEGIN [A-Z ]*PRIVATE KEY-----")
SAFE_SECRET_REFERENCE_RE = re.compile(
r"""(?ix)
^\s*
(unknown|none|null|redacted|not\s+documented|not\s+stored|password\s+manager|vault|secret\s+reference|reference|ref:)
\b
"""
)
SECRET_SCAN_SUFFIXES = {".md", ".yml", ".yaml", ".json", ".env"}
@dataclass(frozen=True)
class ValidationIssue:
path: str
severity: str
code: str
message: str
def validate_repository(settings: Settings) -> dict[str, object]:
issues: list[ValidationIssue] = []
issues.extend(_validate_schema_json(settings.repo_root / "schemas"))
issues.extend(_validate_markdown_frontmatter(settings))
issues.extend(_validate_inventory(settings))
issues.extend(_validate_inventory_doc_links(settings.repo_root))
issues.extend(_validate_unique_inventory_ids(settings))
issues.extend(_validate_secret_patterns(settings.repo_root))
serialized = [issue.__dict__ for issue in issues]
return {
"status": "ok" if not serialized else "issues",
"issue_count": len(serialized),
"issues": serialized,
}
def _validate_schema_json(schema_dir: Path) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
for path in sorted(schema_dir.glob("*.json")):
try:
schema = json.loads(path.read_text(encoding="utf-8"))
Draft202012Validator.check_schema(schema)
except json.JSONDecodeError as exc:
issues.append(
ValidationIssue(
path=path.as_posix(),
severity="error",
code="invalid-json-schema",
message=f"{exc.msg} at line {exc.lineno}, column {exc.colno}",
)
)
except SchemaError as exc:
issues.append(
ValidationIssue(
path=path.as_posix(),
severity="error",
code="invalid-json-schema",
message=exc.message,
)
)
return issues
def _validate_markdown_frontmatter(settings: Settings) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
repo = DocsRepository(settings)
for doc in repo.list_docs():
path = str(doc["path"])
if path == "README.md" or path.startswith("server/"):
continue
frontmatter = doc.get("frontmatter")
if not isinstance(frontmatter, dict) or not frontmatter:
issues.append(
ValidationIssue(
path=path,
severity="error",
code="missing-frontmatter",
message="Markdown document is missing frontmatter",
)
)
continue
missing = sorted(REQUIRED_FRONTMATTER - set(frontmatter))
if missing:
issues.append(
ValidationIssue(
path=path,
severity="error",
code="missing-frontmatter-fields",
message="Missing frontmatter fields: " + ", ".join(missing),
)
)
return issues
def _validate_inventory(settings: Settings) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
schema_dir = settings.repo_root / "schemas"
for inventory_type, schema_name in SCHEMA_BY_INVENTORY.items():
inventory_path = settings.inventory_dir / f"{inventory_type}.yml"
schema_path = schema_dir / schema_name
if not inventory_path.exists():
issues.append(
ValidationIssue(
path=inventory_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="missing-inventory-file",
message=f"Inventory file is missing for type {inventory_type}",
)
)
continue
try:
data = yaml.safe_load(inventory_path.read_text(encoding="utf-8"))
except yaml.YAMLError as exc:
issues.append(
ValidationIssue(
path=inventory_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="invalid-yaml",
message=str(exc),
)
)
continue
try:
schema = json.loads(schema_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
issues.append(
ValidationIssue(
path=schema_path.relative_to(settings.repo_root).as_posix(),
severity="error",
code="unreadable-schema",
message=str(exc),
)
)
continue
issues.extend(_validate_against_schema(settings.repo_root, inventory_path, data, schema))
return issues
def _validate_against_schema(
repo_root: Path,
inventory_path: Path,
data: Any,
schema: dict[str, Any],
) -> list[ValidationIssue]:
rel_path = inventory_path.relative_to(repo_root).as_posix()
if data is None:
data = []
data = _normalize_yaml_scalars(data)
issues: list[ValidationIssue] = []
validator = Draft202012Validator(schema)
for error in sorted(validator.iter_errors(data), key=lambda item: list(item.path)):
location = _format_json_path(error.path)
issues.append(
ValidationIssue(
path=rel_path,
severity="error",
code="json-schema-validation-error",
message=f"{location}: {error.message}",
)
)
return issues
def _format_json_path(path: Any) -> str:
parts = list(path)
if not parts:
return "$"
formatted = "$"
for part in parts:
if isinstance(part, int):
formatted += f"[{part}]"
else:
formatted += f".{part}"
return formatted
def _normalize_yaml_scalars(value: Any) -> Any:
if isinstance(value, date):
return value.isoformat()
if isinstance(value, list):
return [_normalize_yaml_scalars(item) for item in value]
if isinstance(value, dict):
return {key: _normalize_yaml_scalars(item) for key, item in value.items()}
return value
def _validate_inventory_doc_links(repo_root: Path) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
inventory_dir = repo_root / "40-inventory"
for inventory_file in sorted(inventory_dir.glob("*.yml")):
for line in inventory_file.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped.startswith("docs: "):
continue
rel = stripped.split(": ", 1)[1].strip().strip("\"'")
target = (inventory_file.parent / rel).resolve()
if not target.exists():
issues.append(
ValidationIssue(
path=inventory_file.relative_to(repo_root).as_posix(),
severity="error",
code="missing-doc-link-target",
message=f"docs target does not exist: {rel}",
)
)
return issues
def _validate_unique_inventory_ids(settings: Settings) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
for inventory_file in sorted(settings.inventory_dir.glob("*.yml")):
rel_path = inventory_file.relative_to(settings.repo_root).as_posix()
try:
data = yaml.safe_load(inventory_file.read_text(encoding="utf-8"))
except yaml.YAMLError:
continue
if data is None:
continue
if not isinstance(data, list):
continue
seen: dict[str, int] = {}
for index, item in enumerate(data):
if not isinstance(item, dict):
continue
item_id = item.get("id")
if not isinstance(item_id, str) or not item_id:
continue
if item_id in seen:
issues.append(
ValidationIssue(
path=rel_path,
severity="error",
code="duplicate-inventory-id",
message=f"Duplicate id {item_id!r} at item indexes {seen[item_id]} and {index}",
)
)
else:
seen[item_id] = index
return issues
def _validate_secret_patterns(repo_root: Path) -> list[ValidationIssue]:
issues: list[ValidationIssue] = []
for path in sorted(repo_root.rglob("*")):
if not path.is_file() or path.suffix not in SECRET_SCAN_SUFFIXES:
continue
if any(part in EXCLUDED_PARTS for part in path.parts):
continue
rel_path = path.relative_to(repo_root).as_posix()
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
if PRIVATE_KEY_BLOCK_RE.search(text):
issues.append(
ValidationIssue(
path=rel_path,
severity="error",
code="possible-secret",
message="File contains a private key block marker",
)
)
for line_number, line in enumerate(text.splitlines(), start=1):
match = SECRET_ASSIGNMENT_RE.search(line)
if not match:
continue
value = match.group("value").strip().strip("\"'")
if not value or SAFE_SECRET_REFERENCE_RE.search(value):
continue
issues.append(
ValidationIssue(
path=rel_path,
severity="error",
code="possible-secret",
message=f"Possible raw secret assignment at line {line_number}",
)
)
return issues