Newer
Older
gnexus-auth-client-py / src / gnexus_gauth / support.py
"""Support implementations for contracts."""

from datetime import datetime, timezone

from gnexus_gauth.contracts import (
    ClockInterface,
    PkceStoreInterface,
    StateStoreInterface,
    TokenStoreInterface,
)
from gnexus_gauth.dto import TokenSet


class SystemClock(ClockInterface):
    """System clock using UTC."""

    def now(self) -> datetime:
        return datetime.now(timezone.utc)


class InMemoryStateStore(StateStoreInterface):
    """In-memory authorization state store with TTL enforcement."""

    def __init__(self) -> None:
        self._items: dict[str, dict] = {}

    def put(self, state: str, expires_at: datetime, context: dict | None = None) -> None:
        self._items[state] = {"expires_at": expires_at, "context": context or {}}

    def has(self, state: str) -> bool:
        record = self._items.get(state)
        if record is None:
            return False
        if record["expires_at"] < datetime.now(timezone.utc):
            del self._items[state]
            return False
        return True

    def get_context(self, state: str) -> dict:
        if not self.has(state):
            return {}
        return self._items[state].get("context", {})

    def forget(self, state: str) -> None:
        self._items.pop(state, None)


class InMemoryPkceStore(PkceStoreInterface):
    """In-memory PKCE verifier store with TTL enforcement."""

    def __init__(self) -> None:
        self._items: dict[str, dict] = {}

    def put(self, state: str, verifier: str, expires_at: datetime) -> None:
        self._items[state] = {"verifier": verifier, "expires_at": expires_at}

    def get(self, state: str) -> str | None:
        record = self._items.get(state)
        if record is None:
            return None
        if record["expires_at"] < datetime.now(timezone.utc):
            del self._items[state]
            return None
        return record["verifier"]

    def forget(self, state: str) -> None:
        self._items.pop(state, None)


class InMemoryTokenStore(TokenStoreInterface):
    """In-memory token store."""

    def __init__(self) -> None:
        self._items: dict[str, TokenSet] = {}

    def put(self, key: str, token_set: TokenSet) -> None:
        self._items[key] = token_set

    def get(self, key: str) -> TokenSet | None:
        return self._items.get(key)

    def forget(self, key: str) -> None:
        self._items.pop(key, None)