"""Tests for GAuthClient."""
from gnexus_gauth.client import GAuthClient
from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.contracts import (
RuntimeUserProviderInterface,
TokenEndpointInterface,
WebhookParserInterface,
WebhookVerifierInterface,
)
from gnexus_gauth.dto import (
AuthenticatedUser,
AuthorizationRequest,
TokenSet,
VerifiedWebhook,
WebhookEvent,
)
from gnexus_gauth.support import InMemoryPkceStore, InMemoryStateStore
class FakeTokenEndpoint(TokenEndpointInterface):
def __init__(self) -> None:
self.exchanged = False
self.refreshed = False
self.revoked = False
def exchange_authorization_code(self, code: str, pkce_verifier: str) -> TokenSet:
self.exchanged = True
return TokenSet("access", "refresh", "Bearer", 900)
def refresh_token(self, refresh_token: str) -> TokenSet:
self.refreshed = True
return TokenSet("new_access", "new_refresh", "Bearer", 900)
def revoke_token(self, token: str, token_type_hint: str | None = None) -> None:
self.revoked = True
class FakeRuntimeUserProvider(RuntimeUserProviderInterface):
def fetch_user(self, access_token: str) -> AuthenticatedUser:
return AuthenticatedUser("1", "user@example.test", True)
class FakeWebhookVerifier(WebhookVerifierInterface):
def verify(self, raw_body: str, headers: dict, secret: str) -> VerifiedWebhook:
return VerifiedWebhook(raw_body, {}, "evt_1", __import__("datetime").datetime.now(__import__("datetime").timezone.utc))
class FakeWebhookParser(WebhookParserInterface):
def parse(self, raw_body: str) -> WebhookEvent:
return WebhookEvent("evt_1", "webhook.test")
class TestGAuthClient:
def _make_client(self) -> tuple[GAuthClient, FakeTokenEndpoint, InMemoryStateStore, InMemoryPkceStore]:
state_store = InMemoryStateStore()
pkce_store = InMemoryPkceStore()
token_endpoint = FakeTokenEndpoint()
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
)
client = GAuthClient(
config=config,
token_endpoint=token_endpoint,
runtime_user_provider=FakeRuntimeUserProvider(),
webhook_verifier=FakeWebhookVerifier(),
webhook_parser=FakeWebhookParser(),
state_store=state_store,
pkce_store=pkce_store,
)
return client, token_endpoint, state_store, pkce_store
def test_build_authorization_request(self) -> None:
client, _, state_store, pkce_store = self._make_client()
request = client.build_authorization_request("/dashboard", ["openid", "profile"])
assert isinstance(request, AuthorizationRequest)
assert request.state
assert request.pkce_verifier
assert request.pkce_challenge
assert state_store.has(request.state)
assert pkce_store.get(request.state) == request.pkce_verifier
assert "response_type=code" in request.authorization_url
assert "client_id=billing" in request.authorization_url
assert "scope=openid%20profile" in request.authorization_url
assert "return_to=%2Fdashboard" in request.authorization_url
def test_exchange_authorization_code(self) -> None:
client, token_endpoint, state_store, pkce_store = self._make_client()
auth = client.build_authorization_request()
token_set = client.exchange_authorization_code("code_123", auth.state)
assert token_set.access_token == "access"
assert token_endpoint.exchanged
assert not state_store.has(auth.state)
assert pkce_store.get(auth.state) is None
def test_refresh_token(self) -> None:
client, token_endpoint, _, _ = self._make_client()
token_set = client.refresh_token("old_refresh")
assert token_set.access_token == "new_access"
assert token_endpoint.refreshed
def test_revoke_token(self) -> None:
client, token_endpoint, _, _ = self._make_client()
client.revoke_token("token_123")
assert token_endpoint.revoked
def test_fetch_user(self) -> None:
client, _, _, _ = self._make_client()
user = client.fetch_user("access_token")
assert user.user_id == "1"
assert user.email == "user@example.test"
def test_verify_and_parse_webhook(self) -> None:
client, _, _, _ = self._make_client()
event = client.verify_and_parse_webhook('{"type":"test"}', {}, "secret")
assert event.event_type == "webhook.test"