"""Runtime API client for gnexus-gauth."""
import httpx
from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.contracts import RuntimeUserProviderInterface
from gnexus_gauth.dto import AuthenticatedUser, ClientAccess
from gnexus_gauth.exceptions import RuntimeApiException, TransportException
class HttpRuntimeUserProvider(RuntimeUserProviderInterface):
"""HTTP runtime user provider."""
def __init__(
self,
config: GAuthConfig,
http_client: httpx.Client | None = None,
) -> None:
self._config = config
self._http = http_client or httpx.Client()
self._own_client = http_client is None
def __del__(self) -> None:
if getattr(self, "_own_client", False) and hasattr(self, "_http"):
self._http.close()
def fetch_user(self, access_token: str) -> AuthenticatedUser:
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {access_token}",
}
if self._config.user_agent:
headers["User-Agent"] = self._config.user_agent
try:
response = self._http.get(self._config.user_info_url, headers=headers)
except httpx.TransportError as exc:
raise TransportException("Request to gnexus-auth runtime API failed.") from exc
if response.status_code >= 400:
raise RuntimeApiException("gnexus-auth runtime API returned an error response.")
try:
payload = response.json()
except ValueError:
raise RuntimeApiException("gnexus-auth runtime API returned malformed JSON.")
if not isinstance(payload, dict):
raise RuntimeApiException("gnexus-auth runtime API returned malformed JSON.")
client_access_list: list[ClientAccess] = []
client = payload.get("client")
if isinstance(client, dict) and isinstance(client.get("client_id"), str):
client_access_list.append(
ClientAccess(
client_id=client["client_id"],
access_status="granted",
role_ids=_string_list(client.get("roles")),
permission_ids=_string_list(client.get("permissions")),
)
)
return AuthenticatedUser(
user_id=str(payload.get("sub", payload.get("id", ""))),
email=str(payload.get("email", "")),
email_verified=bool(payload.get("email_verified", False)),
system_role=str(payload["system_role"]) if "system_role" in payload else None,
status=str(payload["status"]) if "status" in payload else None,
profile=payload["profile"] if isinstance(payload.get("profile"), dict) else {},
client_access_list=client_access_list,
raw_payload=payload,
)
def _string_list(value: object) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value]