Newer
Older
gnexus-creds / tests / test_auth.py
from datetime import timedelta
from types import SimpleNamespace

import pytest
from httpx import ASGITransport, AsyncClient

from gnexus_creds.models import OAuthState, SessionRecord, User, utcnow


@pytest.mark.anyio
async def test_oauth_login_creates_state(auth_app, db_session):
    async with AsyncClient(
        transport=ASGITransport(app=auth_app), base_url="http://test"
    ) as client:
        response = await client.get("/auth/login", params={"return_to": "/secrets"})

    assert response.status_code == 307
    assert "state=" in response.headers["location"]
    state = db_session.query(OAuthState).one()
    assert state.return_to == "/secrets"
    assert state.scopes == ["openid", "email", "profile"]


@pytest.mark.anyio
async def test_oauth_callback_creates_session(auth_app, db_session, monkeypatch):
    state = OAuthState(
        state="oauth-state",
        pkce_verifier="pkce-verifier",
        return_to="/",
        scopes=["openid", "email", "profile"],
        expires_at=utcnow() + timedelta(minutes=10),
    )
    db_session.add(state)
    db_session.commit()

    class FakeTokenEndpoint:
        def __init__(self, config):
            self.config = config

        def exchange_authorization_code(self, code, verifier):
            assert code == "oauth-code"
            assert verifier == "pkce-verifier"
            return SimpleNamespace(access_token="access-token")

    class FakeRuntimeUserProvider:
        def __init__(self, config):
            self.config = config

        def fetch_user(self, access_token):
            assert access_token == "access-token"
            return SimpleNamespace(
                user_id="auth-subject",
                email="auth@example.test",
                status="enabled",
                system_role="admin",
                profile={"display_name": "Auth User", "locale": "en"},
            )

    monkeypatch.setattr("gnexus_creds.oauth.HttpTokenEndpoint", FakeTokenEndpoint)
    monkeypatch.setattr("gnexus_creds.oauth.HttpRuntimeUserProvider", FakeRuntimeUserProvider)

    async with AsyncClient(
        transport=ASGITransport(app=auth_app), base_url="http://test"
    ) as client:
        response = await client.get(
            "/auth/callback", params={"code": "oauth-code", "state": "oauth-state"}
        )
        assert response.status_code == 307
        assert response.headers["location"] == "/"
        me_response = await client.get("/api/v1/me")

    assert me_response.status_code == 200
    assert me_response.json()["email"] == "auth@example.test"
    assert me_response.json()["role"] == "admin"
    db_session.expire_all()
    assert db_session.query(User).filter(User.auth_subject == "auth-subject").count() == 1
    assert db_session.query(SessionRecord).count() == 1


@pytest.mark.anyio
async def test_logout_clears_session_and_cookie(auth_app, db_session, user):
    session = SessionRecord(
        id="test-session-id",
        user_id=user.id,
        data={},
        expires_at=utcnow() + timedelta(days=1),
    )
    db_session.add(session)
    db_session.commit()

    async with AsyncClient(
        transport=ASGITransport(app=auth_app), base_url="http://test"
    ) as client:
        client.cookies.set("gnexus_creds_session", "test-session-id")
        response = await client.post("/auth/logout")

    assert response.status_code == 200
    assert response.json()["status"] == "ok"

    set_cookie = response.headers.get("set-cookie", "")
    assert "gnexus_creds_session" in set_cookie
    assert "Max-Age=0" in set_cookie or "expires=" in set_cookie.lower()

    db_session.expire_all()
    assert db_session.get(SessionRecord, "test-session-id") is None


@pytest.mark.anyio
async def test_gnexus_auth_webhook_updates_user(auth_app, db_session, user, monkeypatch):
    class FakeVerifier:
        def __init__(self, config):
            self.config = config

        def verify(self, raw, headers, secret):
            assert "auth-user-1" in raw
            assert secret

    class FakeParser:
        def parse(self, raw):
            return SimpleNamespace(
                target_identifiers={"sub": "auth-user-1"},
                metadata={
                    "status": "disabled",
                    "profile": {"display_name": "Disabled User", "locale": "uk"},
                },
            )

    monkeypatch.setattr("gnexus_creds.oauth.HmacWebhookVerifier", FakeVerifier)
    monkeypatch.setattr("gnexus_creds.oauth.JsonWebhookParser", FakeParser)

    async with AsyncClient(
        transport=ASGITransport(app=auth_app), base_url="http://test"
    ) as client:
        response = await client.post(
            "/webhooks/gnexus-auth",
            json={"sub": "auth-user-1"},
            headers={"X-Test-Signature": "ok"},
        )

    assert response.status_code == 200
    db_session.refresh(user)
    assert user.status == "disabled"
    assert user.display_name == "Disabled User"
    assert user.locale == "uk"