from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Any
from .inventory import InventoryRepository
KNOWN_RELATION_FIELDS = {
"hardware": {"runs_hosts"},
"virtual-machines": {"hypervisor_host", "runs_services"},
"hosts": {"hardware_node"},
"services": {"host", "domains"},
"domains": {"points_to", "used_by"},
"traffic-routes": {"source", "entrypoint", "path", "destination", "used_by"},
"databases": {"host", "used_by", "backup_policy"},
"backups": {"target"},
"endpoints": {"domain", "service", "owner_host"},
"integrations": {"source", "target", "via"},
"projects": {"related_services", "related_hosts", "related_domains"},
}
@dataclass(frozen=True)
class RelationshipEdge:
source: str
target: str
relation: str
@dataclass(frozen=True)
class UnresolvedReference:
source: str
field: str
target: str
def build_relationships(repo: InventoryRepository) -> dict[str, object]:
records_by_type = _read_inventory_records(repo)
node_by_raw_id = _build_node_lookup(records_by_type)
nodes = _build_nodes(records_by_type)
edges: list[RelationshipEdge] = []
unresolved: list[UnresolvedReference] = []
_add_hardware_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_vm_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_host_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_service_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_domain_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_traffic_route_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_database_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_backup_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_endpoint_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_integration_edges(records_by_type, node_by_raw_id, edges, unresolved)
_add_project_edges(records_by_type, node_by_raw_id, edges, unresolved)
return {
"nodes": nodes,
"edges": _serialize_edges(edges),
"unresolved_references": _serialize_unresolved(unresolved),
"summary": {
"node_count": len(nodes),
"edge_count": len(edges),
"unresolved_reference_count": len(unresolved),
},
}
def _read_inventory_records(repo: InventoryRepository) -> dict[str, list[dict[str, Any]]]:
records_by_type: dict[str, list[dict[str, Any]]] = {}
for inventory_type in repo.list_types():
parsed = repo.read_parsed(inventory_type)
if not isinstance(parsed, list):
records_by_type[inventory_type] = []
continue
records_by_type[inventory_type] = [item for item in parsed if isinstance(item, dict)]
return records_by_type
def _build_node_lookup(records_by_type: dict[str, list[dict[str, Any]]]) -> dict[str, str]:
lookup: dict[str, str] = {}
for inventory_type, records in records_by_type.items():
for record in records:
item_id = record.get("id")
if isinstance(item_id, str) and item_id:
lookup.setdefault(item_id, _node_id(inventory_type, item_id))
return lookup
def _build_nodes(records_by_type: dict[str, list[dict[str, Any]]]) -> list[dict[str, object]]:
nodes: list[dict[str, object]] = []
for inventory_type, records in sorted(records_by_type.items()):
for record in records:
item_id = record.get("id")
if not isinstance(item_id, str) or not item_id:
continue
nodes.append(
{
"id": _node_id(inventory_type, item_id),
"inventory_type": inventory_type,
"item_id": item_id,
"label": record.get("name") or record.get("fqdn") or item_id,
"status": record.get("status", "unknown"),
"docs": record.get("docs"),
"last_reviewed": _normalize_scalar(record.get("last_reviewed")),
}
)
return nodes
def _add_hardware_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("hardware", []):
source = _source_node("hardware", record)
for target in _as_list(record.get("runs_hosts")):
_append_reference(source, target, "runs_host", "runs_hosts", node_by_raw_id, edges, unresolved)
def _add_vm_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("virtual-machines", []):
source = _source_node("virtual-machines", record)
_append_reference(
source,
record.get("hypervisor_host"),
"runs_on",
"hypervisor_host",
node_by_raw_id,
edges,
unresolved,
)
for target in _as_list(record.get("runs_services")):
_append_reference(source, target, "runs_service", "runs_services", node_by_raw_id, edges, unresolved)
def _add_host_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("hosts", []):
source = _source_node("hosts", record)
_append_reference(
source,
record.get("hardware_node"),
"hardware_node",
"hardware_node",
node_by_raw_id,
edges,
unresolved,
)
def _add_service_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("services", []):
source = _source_node("services", record)
_append_reference(source, record.get("host"), "hosted_on", "host", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("domains")):
_append_reference(source, target, "uses_domain", "domains", node_by_raw_id, edges, unresolved)
def _add_domain_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("domains", []):
source = _source_node("domains", record)
for target in _as_list(record.get("points_to")):
_append_reference(source, target, "points_to", "points_to", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("used_by")):
_append_reference(source, target, "used_by", "used_by", node_by_raw_id, edges, unresolved)
def _add_traffic_route_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("traffic-routes", []):
source = _source_node("traffic-routes", record)
for field in ("source", "entrypoint", "destination"):
_append_reference(source, record.get(field), field, field, node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("path")):
_append_reference(source, target, "path_step", "path", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("used_by")):
_append_reference(source, target, "used_by", "used_by", node_by_raw_id, edges, unresolved)
def _add_database_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("databases", []):
source = _source_node("databases", record)
_append_reference(source, record.get("host"), "hosted_on", "host", node_by_raw_id, edges, unresolved)
_append_reference(
source,
record.get("backup_policy"),
"backup_policy",
"backup_policy",
node_by_raw_id,
edges,
unresolved,
)
for target in _as_list(record.get("used_by")):
_append_reference(source, target, "used_by", "used_by", node_by_raw_id, edges, unresolved)
def _add_backup_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("backups", []):
source = _source_node("backups", record)
_append_reference(source, record.get("target"), "backs_up", "target", node_by_raw_id, edges, unresolved)
def _add_endpoint_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("endpoints", []):
source = _source_node("endpoints", record)
_append_reference(source, record.get("domain"), "endpoint_for_domain", "domain", node_by_raw_id, edges, unresolved)
_append_reference(source, record.get("service"), "endpoint_for_service", "service", node_by_raw_id, edges, unresolved)
_append_reference(source, record.get("owner_host"), "owned_by_host", "owner_host", node_by_raw_id, edges, unresolved)
def _add_integration_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("integrations", []):
source = _source_node("integrations", record)
_append_reference(source, record.get("source"), "integration_source", "source", node_by_raw_id, edges, unresolved)
_append_reference(source, record.get("target"), "integration_target", "target", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("via")):
_append_reference(source, target, "integration_via", "via", node_by_raw_id, edges, unresolved)
def _add_project_edges(
records_by_type: dict[str, list[dict[str, Any]]],
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
for record in records_by_type.get("projects", []):
source = _source_node("projects", record)
for target in _as_list(record.get("related_services")):
_append_reference(source, target, "project_service", "related_services", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("related_hosts")):
_append_reference(source, target, "project_host", "related_hosts", node_by_raw_id, edges, unresolved)
for target in _as_list(record.get("related_domains")):
_append_reference(source, target, "project_domain", "related_domains", node_by_raw_id, edges, unresolved)
def _append_reference(
source: str,
raw_target: object,
relation: str,
field: str,
node_by_raw_id: dict[str, str],
edges: list[RelationshipEdge],
unresolved: list[UnresolvedReference],
) -> None:
if not isinstance(raw_target, str) or not raw_target:
return
target = node_by_raw_id.get(raw_target)
if target:
edges.append(RelationshipEdge(source=source, target=target, relation=relation))
else:
unresolved.append(UnresolvedReference(source=source, field=field, target=raw_target))
def _source_node(inventory_type: str, record: dict[str, Any]) -> str:
return _node_id(inventory_type, str(record["id"]))
def _node_id(inventory_type: str, item_id: str) -> str:
return f"{inventory_type}/{item_id}"
def _as_list(value: object) -> list[object]:
if isinstance(value, list):
return value
if value is None:
return []
return [value]
def _normalize_scalar(value: object) -> object:
if isinstance(value, date):
return value.isoformat()
return value
def _serialize_edges(edges: list[RelationshipEdge]) -> list[dict[str, str]]:
unique = {(edge.source, edge.target, edge.relation) for edge in edges}
return [
{"source": source, "target": target, "relation": relation}
for source, target, relation in sorted(unique)
]
def _serialize_unresolved(unresolved: list[UnresolvedReference]) -> list[dict[str, str]]:
unique = {(item.source, item.field, item.target) for item in unresolved}
return [
{"source": source, "field": field, "target": target}
for source, field, target in sorted(unique)
]