"""Support implementations for contracts."""
from datetime import datetime, timezone
from gnexus_gauth.contracts import (
ClockInterface,
PkceStoreInterface,
StateStoreInterface,
TokenStoreInterface,
)
from gnexus_gauth.dto import TokenSet
class SystemClock(ClockInterface):
"""System clock using UTC."""
def now(self) -> datetime:
return datetime.now(timezone.utc)
class InMemoryStateStore(StateStoreInterface):
"""In-memory authorization state store with TTL enforcement."""
def __init__(self) -> None:
self._items: dict[str, dict] = {}
def put(self, state: str, expires_at: datetime, context: dict | None = None) -> None:
self._items[state] = {"expires_at": expires_at, "context": context or {}}
def has(self, state: str) -> bool:
record = self._items.get(state)
if record is None:
return False
if record["expires_at"] < datetime.now(timezone.utc):
del self._items[state]
return False
return True
def get_context(self, state: str) -> dict:
if not self.has(state):
return {}
return self._items[state].get("context", {})
def forget(self, state: str) -> None:
self._items.pop(state, None)
class InMemoryPkceStore(PkceStoreInterface):
"""In-memory PKCE verifier store with TTL enforcement."""
def __init__(self) -> None:
self._items: dict[str, dict] = {}
def put(self, state: str, verifier: str, expires_at: datetime) -> None:
self._items[state] = {"verifier": verifier, "expires_at": expires_at}
def get(self, state: str) -> str | None:
record = self._items.get(state)
if record is None:
return None
if record["expires_at"] < datetime.now(timezone.utc):
del self._items[state]
return None
return record["verifier"]
def forget(self, state: str) -> None:
self._items.pop(state, None)
class InMemoryTokenStore(TokenStoreInterface):
"""In-memory token store."""
def __init__(self) -> None:
self._items: dict[str, TokenSet] = {}
def put(self, key: str, token_set: TokenSet) -> None:
self._items[key] = token_set
def get(self, key: str) -> TokenSet | None:
return self._items.get(key)
def forget(self, key: str) -> None:
self._items.pop(key, None)