Newer
Older
gnexus-auth-client-py / tests / unit / test_oauth.py
"""Tests for OAuth components."""

from datetime import datetime, timedelta, timezone

import httpx
import pytest
import respx
from respx import mock

from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.dto import TokenSet
from gnexus_gauth.exceptions import TokenExchangeException, TransportException
from gnexus_gauth.oauth import (
    AuthorizationUrlBuilder,
    HttpTokenEndpoint,
    PkceGenerator,
)


class TestPkceGenerator:
    def test_generate_verifier(self) -> None:
        verifier = PkceGenerator.generate_verifier()
        assert verifier
        assert "=" not in verifier
        assert "+" not in verifier
        assert "/" not in verifier

    def test_generate_challenge(self) -> None:
        verifier = PkceGenerator.generate_verifier()
        challenge = PkceGenerator.generate_challenge(verifier)
        assert challenge
        assert "=" not in challenge
        assert "+" not in challenge
        assert "/" not in challenge

    def test_generate_state(self) -> None:
        state = PkceGenerator.generate_state()
        assert state
        assert "=" not in state


class TestAuthorizationUrlBuilder:
    def test_build_basic(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        builder = AuthorizationUrlBuilder(config)
        url = builder.build(state="abc123", pkce_challenge="xyz789")
        assert url.startswith("https://auth.example.test/oauth/authorize?")
        assert "response_type=code" in url
        assert "client_id=billing" in url
        assert "state=abc123" in url
        assert "code_challenge=xyz789" in url
        assert "code_challenge_method=S256" in url

    def test_build_with_scopes(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        builder = AuthorizationUrlBuilder(config)
        url = builder.build(state="s", pkce_challenge="c", scopes=["openid", "profile"])
        assert "scope=openid%20profile" in url

    def test_build_with_return_to(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        builder = AuthorizationUrlBuilder(config)
        url = builder.build(state="s", pkce_challenge="c", return_to="/dashboard")
        assert "return_to=%2Fdashboard" in url


class TestHttpTokenEndpoint:
    def test_exchange_authorization_code_success(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )

        with respx.mock:
            route = respx.post("https://auth.example.test/oauth/token").mock(
                return_value=httpx.Response(
                    200,
                    json={
                        "access_token": "access_123",
                        "refresh_token": "refresh_123",
                        "token_type": "Bearer",
                        "expires_in": 900,
                        "scope": "openid profile",
                    },
                )
            )
            client = httpx.Client()
            endpoint = HttpTokenEndpoint(config, http_client=client)
            token_set = endpoint.exchange_authorization_code("code_123", "verifier_123")

        assert token_set.access_token == "access_123"
        assert token_set.refresh_token == "refresh_123"
        assert token_set.token_type == "Bearer"
        assert token_set.expires_in == 900
        assert token_set.scopes == ["openid", "profile"]
        assert token_set.expires_at is not None
        assert route.called
        request = route.calls.last.request
        assert b"grant_type=authorization_code" in request.content
        assert b"code_verifier=verifier_123" in request.content

    def test_exchange_authorization_code_error(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )

        with respx.mock:
            respx.post("https://auth.example.test/oauth/token").mock(
                return_value=httpx.Response(
                    400,
                    json={"error": "invalid_grant", "error_description": "Code expired."},
                )
            )
            client = httpx.Client()
            endpoint = HttpTokenEndpoint(config, http_client=client)
            with pytest.raises(TokenExchangeException, match="Code expired"):
                endpoint.exchange_authorization_code("code_123", "verifier_123")

    def test_refresh_token_success(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )

        with respx.mock:
            respx.post("https://auth.example.test/oauth/refresh").mock(
                return_value=httpx.Response(
                    200,
                    json={
                        "access_token": "new_access",
                        "refresh_token": "new_refresh",
                        "token_type": "Bearer",
                        "expires_in": 900,
                    },
                )
            )
            client = httpx.Client()
            endpoint = HttpTokenEndpoint(config, http_client=client)
            token_set = endpoint.refresh_token("old_refresh")

        assert token_set.access_token == "new_access"
        assert token_set.refresh_token == "new_refresh"

    def test_revoke_token(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )

        with respx.mock:
            respx.post("https://auth.example.test/oauth/revoke").mock(
                return_value=httpx.Response(200, text="OK")
            )
            client = httpx.Client()
            endpoint = HttpTokenEndpoint(config, http_client=client)
            endpoint.revoke_token("token_123", "access_token")

    def test_transport_error(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )

        with respx.mock:
            respx.post("https://auth.example.test/oauth/token").mock(side_effect=httpx.ConnectError("Connection refused"))
            client = httpx.Client()
            endpoint = HttpTokenEndpoint(config, http_client=client)
            with pytest.raises(TransportException, match="Request to gnexus-auth failed"):
                endpoint.exchange_authorization_code("code", "verifier")