from datetime import timedelta
from types import SimpleNamespace
import pytest
from httpx import ASGITransport, AsyncClient
from gnexus_creds.models import OAuthState, SessionRecord, User, utcnow
@pytest.mark.anyio
async def test_oauth_login_creates_state(auth_app, db_session):
async with AsyncClient(
transport=ASGITransport(app=auth_app), base_url="http://test"
) as client:
response = await client.get("/auth/login", params={"return_to": "/secrets"})
assert response.status_code == 307
assert "state=" in response.headers["location"]
state = db_session.query(OAuthState).one()
assert state.return_to == "/secrets"
assert state.scopes == ["openid", "email", "profile"]
@pytest.mark.anyio
async def test_oauth_callback_creates_session(auth_app, db_session, monkeypatch):
state = OAuthState(
state="oauth-state",
pkce_verifier="pkce-verifier",
return_to="/",
scopes=["openid", "email", "profile"],
expires_at=utcnow() + timedelta(minutes=10),
)
db_session.add(state)
db_session.commit()
class FakeTokenEndpoint:
def __init__(self, config):
self.config = config
def exchange_authorization_code(self, code, verifier):
assert code == "oauth-code"
assert verifier == "pkce-verifier"
return SimpleNamespace(access_token="access-token")
class FakeRuntimeUserProvider:
def __init__(self, config):
self.config = config
def fetch_user(self, access_token):
assert access_token == "access-token"
return SimpleNamespace(
user_id="auth-subject",
email="auth@example.test",
status="enabled",
system_role="admin",
profile={"display_name": "Auth User", "locale": "en"},
)
monkeypatch.setattr("gnexus_creds.oauth.HttpTokenEndpoint", FakeTokenEndpoint)
monkeypatch.setattr("gnexus_creds.oauth.HttpRuntimeUserProvider", FakeRuntimeUserProvider)
async with AsyncClient(
transport=ASGITransport(app=auth_app), base_url="http://test"
) as client:
response = await client.get(
"/auth/callback", params={"code": "oauth-code", "state": "oauth-state"}
)
assert response.status_code == 307
assert response.headers["location"] == "/"
me_response = await client.get("/api/v1/me")
assert me_response.status_code == 200
assert me_response.json()["email"] == "auth@example.test"
assert me_response.json()["role"] == "admin"
db_session.expire_all()
assert db_session.query(User).filter(User.auth_subject == "auth-subject").count() == 1
assert db_session.query(SessionRecord).count() == 1
@pytest.mark.anyio
async def test_logout_clears_session_and_cookie(auth_app, db_session, user):
session = SessionRecord(
id="test-session-id",
user_id=user.id,
data={},
expires_at=utcnow() + timedelta(days=1),
)
db_session.add(session)
db_session.commit()
async with AsyncClient(
transport=ASGITransport(app=auth_app), base_url="http://test"
) as client:
client.cookies.set("gnexus_creds_session", "test-session-id")
response = await client.post("/auth/logout")
assert response.status_code == 200
assert response.json()["status"] == "ok"
set_cookie = response.headers.get("set-cookie", "")
assert "gnexus_creds_session" in set_cookie
assert "Max-Age=0" in set_cookie or "expires=" in set_cookie.lower()
db_session.expire_all()
assert db_session.get(SessionRecord, "test-session-id") is None
@pytest.mark.anyio
async def test_gnexus_auth_webhook_updates_user(auth_app, db_session, user, monkeypatch):
class FakeVerifier:
def __init__(self, config):
self.config = config
def verify(self, raw, headers, secret):
assert "auth-user-1" in raw
assert secret
class FakeParser:
def parse(self, raw):
return SimpleNamespace(
target_identifiers={"sub": "auth-user-1"},
metadata={
"status": "disabled",
"profile": {"display_name": "Disabled User", "locale": "uk"},
},
)
monkeypatch.setattr("gnexus_creds.oauth.HmacWebhookVerifier", FakeVerifier)
monkeypatch.setattr("gnexus_creds.oauth.JsonWebhookParser", FakeParser)
async with AsyncClient(
transport=ASGITransport(app=auth_app), base_url="http://test"
) as client:
response = await client.post(
"/webhooks/gnexus-auth",
json={"sub": "auth-user-1"},
headers={"X-Test-Signature": "ok"},
)
assert response.status_code == 200
db_session.refresh(user)
assert user.status == "disabled"
assert user.display_name == "Disabled User"
assert user.locale == "uk"