"""Auth endpoints for gnexus-auth OAuth integration."""
import asyncio
from datetime import datetime, timezone
import structlog
from fastapi import APIRouter, HTTPException, Request, Response
from gnexus_gauth.exceptions import (
PkceException,
StateValidationException,
TokenExchangeException,
)
from navi.auth.client import get_gauth_client
from navi.auth.deps import get_current_user, require_user
from navi.auth.encrypt import get_encryptor
from navi.auth import User
from navi.config import settings
log = structlog.get_logger()
router = APIRouter(prefix="/auth", tags=["auth"])
def _get_redirect_uri(request: Request) -> str:
"""Build redirect_uri from the incoming request, respecting reverse proxies."""
# FastAPI's request.base_url includes scheme+host+port; strip trailing slash
base = str(request.base_url).rstrip("/")
return f"{base}/auth/callback"
@router.get("/login")
async def auth_login(request: Request) -> Response:
"""Redirect to gnexus-auth OAuth authorization endpoint."""
redirect_uri = _get_redirect_uri(request)
client = get_gauth_client(redirect_uri=redirect_uri)
auth_request = client.build_authorization_request(
return_to="/",
scopes=["openid", "email", "profile", "roles", "permissions"],
)
log.info("auth.login_redirect", state=auth_request.state[:8] + "...", redirect_uri=redirect_uri)
return Response(status_code=302, headers={"Location": auth_request.authorization_url})
@router.get("/callback")
async def auth_callback(code: str, state: str, request: Request) -> Response:
"""Handle OAuth callback from gnexus-auth."""
redirect_uri = _get_redirect_uri(request)
client = get_gauth_client(redirect_uri=redirect_uri)
encryptor = get_encryptor()
# Exchange code for tokens (sync IO wrapped in thread)
try:
token_set = await asyncio.to_thread(client.exchange_authorization_code, code, state)
except (StateValidationException, PkceException) as e:
log.warning("auth.invalid_state", state=state[:8], error=str(e))
raise HTTPException(status_code=400, detail="Invalid or expired state") from e
except TokenExchangeException as e:
log.warning("auth.token_exchange_failed", error=str(e))
raise HTTPException(status_code=400, detail="Token exchange failed") from e
# Fetch user info (sync IO wrapped in thread)
try:
auth_user = await asyncio.to_thread(client.fetch_user, token_set.access_token)
except Exception as e:
log.warning("auth.fetch_user_failed", error=str(e))
raise HTTPException(status_code=400, detail="Failed to fetch user info") from e
# Determine role
role = "user"
permissions: list[str] = []
for access in auth_user.client_access_list:
if access.client_id == settings.gnexus_auth_client_id:
if settings.gnexus_auth_admin_role_slug in (access.role_ids or []):
role = "admin"
permissions = list(access.permission_ids or [])
break
# Upsert navi_user
try:
from navi.api.deps import get_session_store
store = get_session_store()
pool = await store._get_pool()
import json
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO navi_users (id, email, display_name, role, permissions, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $6)
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
display_name = EXCLUDED.display_name,
role = EXCLUDED.role,
permissions = EXCLUDED.permissions,
updated_at = EXCLUDED.updated_at""",
auth_user.user_id,
auth_user.email,
auth_user.profile.get("display_name") or auth_user.email,
role,
json.dumps(permissions),
datetime.now(timezone.utc),
)
except Exception:
log.warning("auth.upsert_user_failed", user_id=auth_user.user_id, exc_info=True)
# Create auth session
session_id = __import__("uuid").uuid4().hex
try:
pool = await store._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO user_auth_sessions
(id, user_id, access_token_enc, refresh_token_enc, expires_at, created_at, last_used_at)
VALUES ($1, $2, $3, $4, $5, $6, $6)""",
session_id,
auth_user.user_id,
encryptor.encrypt(token_set.access_token),
encryptor.encrypt(token_set.refresh_token or ""),
token_set.expires_at or datetime.now(timezone.utc),
datetime.now(timezone.utc),
)
except Exception:
log.warning("auth.create_session_failed", user_id=auth_user.user_id, exc_info=True)
# Set cookie
cookie_value = session_id
max_age = settings.navi_auth_cookie_max_age_days * 86400
cookie_str = (
f"{settings.navi_auth_cookie_name}={cookie_value}; "
f"Max-Age={max_age}; "
f"HttpOnly; "
f"Path=/; "
f"SameSite={settings.navi_auth_cookie_samesite}"
)
if settings.navi_auth_cookie_secure:
cookie_str += "; Secure"
log.info("auth.login_success", user_id=auth_user.user_id, role=role)
return Response(
status_code=302,
headers={
"Location": "/",
"Set-Cookie": cookie_str,
},
)
@router.post("/logout")
async def auth_logout(response: Response, user: Annotated[User, Depends(require_user)], request: Request) -> dict:
"""Logout current user."""
cookie_name = settings.navi_auth_cookie_name
session_id = request.cookies.get(cookie_name)
if session_id:
try:
from navi.api.deps import get_session_store
store = get_session_store()
pool = await store._get_pool()
async with pool.acquire() as conn:
await conn.execute("DELETE FROM user_auth_sessions WHERE id = $1", session_id)
except Exception:
log.warning("auth.logout_cleanup_failed", exc_info=True)
# Clear cookie
cookie_str = (
f"{cookie_name}=; "
f"Max-Age=0; "
f"HttpOnly; "
f"Path=/; "
f"SameSite={settings.navi_auth_cookie_samesite}"
)
if settings.navi_auth_cookie_secure:
cookie_str += "; Secure"
response.headers["Set-Cookie"] = cookie_str
log.info("auth.logout", user_id=user.id)
return {"ok": True}
@router.get("/me")
async def auth_me(user: Annotated[User, Depends(require_user)]) -> dict:
"""Return current authenticated user."""
return {
"id": user.id,
"email": user.email,
"display_name": user.display_name,
"role": user.role,
"permissions": user.permissions,
}