diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..716f155 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,117 @@ +from datetime import timedelta +from types import SimpleNamespace + +import pytest +from httpx import ASGITransport, AsyncClient + +from gnexus_creds.models import OAuthState, SessionRecord, User, utcnow + + +@pytest.mark.anyio +async def test_oauth_login_creates_state(auth_app, db_session): + async with AsyncClient( + transport=ASGITransport(app=auth_app), base_url="http://test" + ) as client: + response = await client.get("/auth/login", params={"return_to": "/secrets"}) + + assert response.status_code == 307 + assert "state=" in response.headers["location"] + state = db_session.query(OAuthState).one() + assert state.return_to == "/secrets" + assert state.scopes == ["openid", "email", "profile"] + + +@pytest.mark.anyio +async def test_oauth_callback_creates_session(auth_app, db_session, monkeypatch): + state = OAuthState( + state="oauth-state", + pkce_verifier="pkce-verifier", + return_to="/", + scopes=["openid", "email", "profile"], + expires_at=utcnow() + timedelta(minutes=10), + ) + db_session.add(state) + db_session.commit() + + class FakeTokenEndpoint: + def __init__(self, config): + self.config = config + + def exchange_authorization_code(self, code, verifier): + assert code == "oauth-code" + assert verifier == "pkce-verifier" + return SimpleNamespace(access_token="access-token") + + class FakeRuntimeUserProvider: + def __init__(self, config): + self.config = config + + def fetch_user(self, access_token): + assert access_token == "access-token" + return SimpleNamespace( + user_id="auth-subject", + email="auth@example.test", + status="enabled", + system_role="admin", + profile={"display_name": "Auth User", "locale": "en"}, + ) + + monkeypatch.setattr("gnexus_creds.oauth.HttpTokenEndpoint", FakeTokenEndpoint) + monkeypatch.setattr("gnexus_creds.oauth.HttpRuntimeUserProvider", FakeRuntimeUserProvider) + + async with AsyncClient( + transport=ASGITransport(app=auth_app), base_url="http://test" + ) as client: + response = await client.get( + "/auth/callback", params={"code": "oauth-code", "state": "oauth-state"} + ) + assert response.status_code == 307 + assert response.headers["location"] == "/" + me_response = await client.get("/api/v1/me") + + assert me_response.status_code == 200 + assert me_response.json()["email"] == "auth@example.test" + assert me_response.json()["role"] == "admin" + db_session.expire_all() + assert db_session.query(User).filter(User.auth_subject == "auth-subject").count() == 1 + assert db_session.query(SessionRecord).count() == 1 + assert db_session.get(OAuthState, "oauth-state") is None + + +@pytest.mark.anyio +async def test_gnexus_auth_webhook_updates_user(auth_app, db_session, user, monkeypatch): + class FakeVerifier: + def __init__(self, config): + self.config = config + + def verify(self, raw, headers, secret): + assert "auth-user-1" in raw + assert secret + + class FakeParser: + def parse(self, raw): + return SimpleNamespace( + target_identifiers={"sub": "auth-user-1"}, + metadata={ + "status": "disabled", + "profile": {"display_name": "Disabled User", "locale": "uk"}, + }, + ) + + monkeypatch.setattr("gnexus_creds.oauth.HmacWebhookVerifier", FakeVerifier) + monkeypatch.setattr("gnexus_creds.oauth.JsonWebhookParser", FakeParser) + + async with AsyncClient( + transport=ASGITransport(app=auth_app), base_url="http://test" + ) as client: + response = await client.post( + "/webhooks/gnexus-auth", + json={"sub": "auth-user-1"}, + headers={"X-Test-Signature": "ok"}, + ) + + assert response.status_code == 200 + db_session.refresh(user) + assert user.status == "disabled" + assert user.display_name == "Disabled User" + assert user.locale == "uk"