Newer
Older
gnexus-auth-client-py / src / gnexus_gauth / runtime.py
"""Runtime API client for gnexus-gauth."""

import httpx

from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.contracts import RuntimeUserProviderInterface
from gnexus_gauth.dto import AuthenticatedUser, ClientAccess
from gnexus_gauth.exceptions import RuntimeApiException, TransportException


class HttpRuntimeUserProvider(RuntimeUserProviderInterface):
    """HTTP runtime user provider."""

    def __init__(
        self,
        config: GAuthConfig,
        http_client: httpx.Client | None = None,
    ) -> None:
        self._config = config
        self._http = http_client or httpx.Client()
        self._own_client = http_client is None

    def __del__(self) -> None:
        if getattr(self, "_own_client", False) and hasattr(self, "_http"):
            self._http.close()

    def fetch_user(self, access_token: str) -> AuthenticatedUser:
        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {access_token}",
        }
        if self._config.user_agent:
            headers["User-Agent"] = self._config.user_agent

        try:
            response = self._http.get(self._config.user_info_url, headers=headers)
        except httpx.TransportError as exc:
            raise TransportException("Request to gnexus-auth runtime API failed.") from exc

        if response.status_code >= 400:
            raise RuntimeApiException("gnexus-auth runtime API returned an error response.")

        try:
            payload = response.json()
        except ValueError:
            raise RuntimeApiException("gnexus-auth runtime API returned malformed JSON.")

        if not isinstance(payload, dict):
            raise RuntimeApiException("gnexus-auth runtime API returned malformed JSON.")

        client_access_list: list[ClientAccess] = []
        client = payload.get("client")
        if isinstance(client, dict) and isinstance(client.get("client_id"), str):
            client_access_list.append(
                ClientAccess(
                    client_id=client["client_id"],
                    access_status="granted",
                    role_ids=_string_list(client.get("roles")),
                    permission_ids=_string_list(client.get("permissions")),
                )
            )

        return AuthenticatedUser(
            user_id=str(payload.get("sub", payload.get("id", ""))),
            email=str(payload.get("email", "")),
            email_verified=bool(payload.get("email_verified", False)),
            system_role=str(payload["system_role"]) if "system_role" in payload else None,
            status=str(payload["status"]) if "status" in payload else None,
            profile=payload["profile"] if isinstance(payload.get("profile"), dict) else {},
            client_access_list=client_access_list,
            raw_payload=payload,
        )


def _string_list(value: object) -> list[str]:
    if not isinstance(value, list):
        return []
    return [str(item) for item in value]