"""Base classes for UI components served by navi_ui."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ValidationError
def _resolve_ref(ref: str, defs: dict[str, Any]) -> dict[str, Any]:
"""Resolve a JSON Schema $ref against the $defs table."""
key = ref.split("/")[-1]
return defs.get(key, {})
def _type_label(schema: dict[str, Any], defs: dict[str, Any]) -> str:
"""Return a compact type label for a property schema."""
if "$ref" in schema:
resolved = _resolve_ref(schema["$ref"], defs)
return resolved.get("title") or schema["$ref"].split("/")[-1]
if "anyOf" in schema:
parts = []
for part in schema["anyOf"]:
if part.get("type") == "null":
parts.append("null")
else:
parts.append(_type_label(part, defs))
return " | ".join(sorted(set(parts), key=lambda x: (x != "null", x)))
if "allOf" in schema and len(schema["allOf"]) == 1:
return _type_label(schema["allOf"][0], defs)
typ = schema.get("type")
if typ == "array":
item_type = _type_label(schema.get("items", {}), defs)
return f"array of {item_type}"
if typ == "object":
return "object"
if typ:
return str(typ)
return "any"
def _describe_property(
name: str,
schema: dict[str, Any],
defs: dict[str, Any],
required: bool,
indent: int = 0,
) -> list[str]:
"""Return markdown lines describing one property."""
lines: list[str] = []
prefix = " " * indent
if "$ref" in schema:
schema = _resolve_ref(schema["$ref"], defs)
if "anyOf" in schema:
# Filter out the null branch so we can describe the real type.
real_branches = [b for b in schema["anyOf"] if b.get("type") != "null"]
if real_branches:
schema = real_branches[0]
else:
schema = {"type": "null"}
if "allOf" in schema and len(schema["allOf"]) == 1:
schema = schema["allOf"][0]
typ = _type_label(schema, defs)
marker = "required" if required else "optional"
description = schema.get("description", "")
constraints: list[str] = []
if "minLength" in schema:
constraints.append(f"min {schema['minLength']} chars")
if "maxLength" in schema:
constraints.append(f"max {schema['maxLength']} chars")
if "minItems" in schema:
constraints.append(f"min {schema['minItems']} items")
if "maxItems" in schema:
constraints.append(f"max {schema['maxItems']} items")
if "minimum" in schema:
constraints.append(f"≥ {schema['minimum']}")
if "maximum" in schema:
constraints.append(f"≤ {schema['maximum']}")
if "pattern" in schema:
constraints.append(f"matches `{schema['pattern']}`")
if "enum" in schema:
constraints.append(f"one of {schema['enum']}")
meta = ", ".join(constraints)
head = f"{prefix}- `{name}`: `{typ}` ({marker})"
if description:
head += f" — {description}"
if meta:
head += f" *[{meta}]*"
lines.append(head)
# Recurse into inline object properties.
if schema.get("type") == "object" and "properties" in schema:
sub_required = set(schema.get("required", []))
for sub_name, sub_schema in schema["properties"].items():
lines.extend(
_describe_property(sub_name, sub_schema, defs, sub_name in sub_required, indent + 1)
)
# Recurse into array item object properties.
if schema.get("type") == "array" and "properties" in schema.get("items", {}):
items = schema["items"]
sub_required = set(items.get("required", []))
for sub_name, sub_schema in items["properties"].items():
lines.extend(
_describe_property(sub_name, sub_schema, defs, sub_name in sub_required, indent + 1)
)
return lines
def _compact_schema_markdown(schema: dict[str, Any]) -> str:
"""Render a JSON Schema as compact LLM-friendly markdown."""
defs = schema.get("$defs", {})
required = set(schema.get("required", []))
properties = schema.get("properties", {})
if not properties:
return "Payload accepts any JSON object."
lines: list[str] = []
for name, prop_schema in properties.items():
lines.extend(_describe_property(name, prop_schema, defs, name in required))
# Append inline definitions only when they are actually referenced.
referenced_defs: set[str] = set()
def _collect_refs(s: dict[str, Any]) -> None:
if "$ref" in s:
referenced_defs.add(s["$ref"].split("/")[-1])
for v in s.values():
if isinstance(v, dict):
_collect_refs(v)
elif isinstance(v, list):
for item in v:
if isinstance(item, dict):
_collect_refs(item)
_collect_refs(schema)
for def_name in sorted(referenced_defs):
if def_name not in defs:
continue
def_schema = defs[def_name]
if "properties" not in def_schema:
continue
lines.append(f"\n**{def_name}**:")
sub_required = set(def_schema.get("required", []))
for sub_name, sub_schema in def_schema["properties"].items():
lines.extend(
_describe_property(sub_name, sub_schema, defs, sub_name in sub_required)
)
return "\n".join(lines)
class UIComponent:
"""Base class for a renderable UI component.
Subclasses must define:
- ``name`` (str): snake_case identifier used in ``render_component``.
- ``description`` (str): Short human-readable description.
- ``schema`` (type[BaseModel]): Pydantic model describing the payload.
"""
name: str = ""
description: str = ""
schema: type[BaseModel] | None = None
@classmethod
def validate(
cls, payload: dict[str, Any]
) -> tuple[bool, str, dict[str, Any] | None]:
"""Validate *payload* against the component schema.
Returns ``(ok, error_message, validated_payload)``. If validation
succeeds, *validated_payload* contains the normalized dict.
"""
if cls.schema is None:
return True, "", payload
try:
validated = cls.schema.model_validate(payload)
except ValidationError as exc:
errors = []
for err in exc.errors():
loc = ".".join(str(part) for part in err["loc"])
errors.append(f" - {loc}: {err['msg']} (got {err['input']!r})")
return (
False,
f"Invalid payload for component {cls.name!r}.\n" + "\n".join(errors),
None,
)
return True, "", validated.model_dump(mode="json")
@classmethod
def instructions(cls) -> str:
"""Return a markdown schema description for the LLM."""
if cls.schema is not None:
schema_body = _compact_schema_markdown(cls.schema.model_json_schema())
else:
schema_body = "Payload accepts any JSON object."
return (
f"### {cls.name}\n\n{cls.description}\n\n"
f"Payload schema:\n\n{schema_body}\n"
)