"""Auth endpoints for gnexus-auth OAuth integration."""
import asyncio
from datetime import datetime, timezone
import structlog
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from gnexus_gauth.exceptions import (
PkceException,
StateValidationException,
TokenExchangeException,
)
from navi.auth.client import get_gauth_client
from navi.auth.deps import get_current_user, require_user
from navi.auth.encrypt import get_encryptor
from navi.auth import User
from datetime import datetime, timedelta, timezone
from navi.config import settings
log = structlog.get_logger()
router = APIRouter(prefix="/auth", tags=["auth"])
# Tracks extra per-state metadata (platform, return_to) that gnexus-auth
# does not store — keyed by the OAuth state param and cleaned up on callback.
_mobile_auth_states: dict[str, dict] = {}
_MOBILE_AUTH_TTL = timedelta(minutes=10)
def _cleanup_mobile_auth_states() -> None:
"""Remove expired mobile auth state entries to prevent unbounded growth."""
cutoff = datetime.now(timezone.utc) - _MOBILE_AUTH_TTL
expired = [k for k, v in _mobile_auth_states.items() if v.get("created_at", cutoff) < cutoff]
for k in expired:
_mobile_auth_states.pop(k, None)
def _get_redirect_uri() -> str:
"""Return the configured redirect_uri."""
# Always use the configured redirect_uri so reverse proxies are handled
# correctly (request.base_url would return the internal address).
return settings.gnauth_redirect_uri
def _auth_configured() -> bool:
return bool(settings.gnauth_client_id and settings.gnauth_client_secret)
def _sanitize_return_to(return_to: str) -> str:
"""Prevent open-redirect by allowing only relative paths."""
if not return_to or not return_to.startswith("/"):
return "/"
if "://" in return_to:
return "/"
return return_to
@router.get("/login")
async def auth_login(
request: Request,
return_to: str = "/",
platform: str | None = None,
) -> Response:
"""Redirect to gnexus-auth OAuth authorization endpoint."""
if not _auth_configured():
raise HTTPException(status_code=503, detail="OAuth is not configured. Set GNAUTH_CLIENT_ID and GNAUTH_CLIENT_SECRET in .env")
redirect_uri = _get_redirect_uri()
client = get_gauth_client(redirect_uri=redirect_uri)
safe_return_to = _sanitize_return_to(return_to)
auth_request = client.build_authorization_request(
return_to=safe_return_to,
scopes=["openid", "email", "profile", "roles", "permissions"],
)
# Detect Android WebView via User-Agent (more reliable than JS navigator.userAgent)
ua = request.headers.get("user-agent", "")
detected_platform = platform or ("android" if "NaviAndroid" in ua else None)
_cleanup_mobile_auth_states()
# Remember platform so callback knows whether to use cookie or bridge page.
_mobile_auth_states[auth_request.state] = {
"platform": detected_platform,
"return_to": safe_return_to,
"created_at": datetime.now(timezone.utc),
}
log.info(
"auth.login_redirect",
state=auth_request.state[:8] + "...",
redirect_uri=redirect_uri,
platform=detected_platform,
return_to=safe_return_to,
)
return Response(status_code=302, headers={"Location": auth_request.authorization_url})
@router.get("/callback")
async def auth_callback(
request: Request,
code: str | None = None,
state: str | None = None,
error: str | None = None,
error_description: str | None = None,
) -> Response:
"""Handle OAuth callback from gnexus-auth."""
if not _auth_configured():
raise HTTPException(status_code=503, detail="OAuth is not configured. Set GNAUTH_CLIENT_ID and GNAUTH_CLIENT_SECRET in .env")
# OAuth error response from the authorization server (e.g. user denied consent,
# or the requested scope is not allowed for this client).
if error:
log.warning(
"auth.oauth_error",
error=error,
error_description=error_description,
state=state[:8] if state else None,
)
raise HTTPException(
status_code=400,
detail=f"OAuth authorization failed: {error_description or error}",
)
if not code:
log.warning("auth.missing_code", query=str(request.query_params))
raise HTTPException(status_code=400, detail="Missing authorization code")
redirect_uri = _get_redirect_uri()
client = get_gauth_client(redirect_uri=redirect_uri)
encryptor = get_encryptor()
# Exchange code for tokens (sync IO wrapped in thread)
try:
token_set = await asyncio.to_thread(client.exchange_authorization_code, code, state)
except (StateValidationException, PkceException) as e:
log.warning("auth.invalid_state", state=state[:8] if state else None, error=str(e))
raise HTTPException(status_code=400, detail="Invalid or expired state") from e
except TokenExchangeException as e:
log.warning("auth.token_exchange_failed", error=str(e))
raise HTTPException(status_code=400, detail="Token exchange failed") from e
# Fetch user info (sync IO wrapped in thread)
try:
auth_user = await asyncio.to_thread(client.fetch_user, token_set.access_token)
except Exception as e:
log.warning("auth.fetch_user_failed", error=str(e))
raise HTTPException(status_code=400, detail="Failed to fetch user info") from e
# Determine role
role = "user"
permissions: list[str] = []
for access in auth_user.client_access_list:
if access.client_id == settings.gnauth_client_id:
if settings.gnauth_admin_role_slug in (access.role_ids or []):
role = "admin"
permissions = list(access.permission_ids or [])
break
# Upsert navi_user
try:
from navi.api.deps import get_session_store
store = get_session_store()
pool = await store._get_pool()
import json
profile = auth_user.profile
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO navi_users (
id, email, display_name, username, first_name, last_name,
phone, birth_date, country, city, locale,
role, permissions, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $14)
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
display_name = EXCLUDED.display_name,
username = EXCLUDED.username,
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
phone = EXCLUDED.phone,
birth_date = EXCLUDED.birth_date,
country = EXCLUDED.country,
city = EXCLUDED.city,
locale = EXCLUDED.locale,
role = EXCLUDED.role,
permissions = EXCLUDED.permissions,
updated_at = EXCLUDED.updated_at""",
auth_user.user_id,
auth_user.email,
profile.get("display_name") or auth_user.email,
profile.get("username"),
profile.get("first_name"),
profile.get("last_name"),
profile.get("phone"),
profile.get("birth_date"),
profile.get("country"),
profile.get("city"),
profile.get("locale"),
role,
json.dumps(permissions),
datetime.now(timezone.utc),
)
except Exception:
log.warning("auth.upsert_user_failed", user_id=auth_user.user_id, exc_info=True)
# Create auth session
session_id = __import__("uuid").uuid4().hex
try:
pool = await store._get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO user_auth_sessions
(id, user_id, access_token_enc, refresh_token_enc, expires_at, created_at, last_used_at)
VALUES ($1, $2, $3, $4, $5, $6, $6)""",
session_id,
auth_user.user_id,
encryptor.encrypt(token_set.access_token),
encryptor.encrypt(token_set.refresh_token or ""),
token_set.expires_at or datetime.now(timezone.utc),
datetime.now(timezone.utc),
)
except Exception:
log.warning("auth.create_session_failed", user_id=auth_user.user_id, exc_info=True)
# Retrieve platform/return_to info before state is forgotten
state_info = _mobile_auth_states.pop(state, {}) if state else {}
is_mobile = state_info.get("platform") == "android"
return_to = state_info.get("return_to", "/")
log.info("auth.login_success", user_id=auth_user.user_id, role=role, is_mobile=is_mobile)
if is_mobile:
# Mobile: redirect to bridge page that will deep-link back into the app.
return Response(
status_code=302,
headers={"Location": f"/auth/mobile-done?sid={session_id}"},
)
# Browser: set httpOnly cookie and redirect.
cookie_value = session_id
max_age = settings.navi_auth_cookie_max_age_days * 86400
cookie_str = (
f"{settings.navi_auth_cookie_name}={cookie_value}; "
f"Max-Age={max_age}; "
f"HttpOnly; "
f"Path=/; "
f"SameSite={settings.navi_auth_cookie_samesite}"
)
if settings.navi_auth_cookie_secure:
cookie_str += "; Secure"
return Response(
status_code=302,
headers={
"Location": return_to,
"Set-Cookie": cookie_str,
},
)
@router.get("/mobile-done")
async def auth_mobile_done(sid: str) -> Response:
"""Bridge page for Android: attempts an automatic deep-link back into
the native app via Chrome Intent URL, and falls back to a manual button
for browsers that block automatic scheme navigation."""
intent_url = f"intent://auth/callback?sid={sid}#Intent;scheme=navi;package=com.navi.client;end"
html = (
"<!DOCTYPE html>"
'<html lang="en">'
'<head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1">'
'<title>Authentication complete</title>'
'<style>'
':root{'
'--page:#16161E;--panel:rgba(192,202,245,0.045);--panel-strong:rgba(192,202,245,0.085);'
'--text-light:#C0CAF5;--text-medium:#A9B1D6;--text-dark:#787C99;'
'--primary:#C0CAF5;--success:#9ECE6A;--accent:#FF9E64;--border-muted:rgba(192,202,245,0.24);'
'--font:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,Helvetica,Arial,sans-serif}'
'body{font-family:var(--font);display:flex;flex-direction:column;align-items:center;'
'justify-content:center;min-height:100vh;margin:0;background:var(--page);}'
'.card{background:var(--panel);border:2px solid var(--border-muted);border-left:6px solid var(--success);'
'padding:34px 22px;text-align:center;max-width:340px;width:90%;}'
'.check{font-size:48px;margin-bottom:12px;color:var(--success);}'
'h1{margin:0 0 12px;font-size:18px;font-weight:600;color:var(--text-light);text-transform:uppercase;letter-spacing:0.08em}'
'p{color:var(--text-medium);margin:0 0 26px;font-size:14px;line-height:1.5;min-height:42px}'
'.btn{display:inline-flex;align-items:center;justify-content:center;min-height:46px;'
'font-family:var(--font);font-size:14px;font-weight:600;text-transform:uppercase;letter-spacing:0.08em;'
'color:var(--success);text-decoration:none;padding:12px 22px;border:2px solid var(--success);'
'border-left-width:6px;transition:background-color .2s,color .2s;}'
'.btn:hover,.btn:active{background:var(--success);color:var(--page)}'
'.hidden{display:none}'
'.spinner{width:20px;height:20px;border:2px solid var(--border-muted);'
'border-top-color:var(--success);border-radius:50%;animation:spin 1s linear infinite;'
'margin:0 auto 12px}'
'@keyframes spin{to{transform:rotate(360deg)}}'
'</style></head>'
'<body>'
'<div class="card">'
'<div class="check">✓</div>'
'<h1>Login successful</h1>'
'<p id="status"><span class="spinner"></span>Opening Navi…</p>'
'<a id="btn" class="btn hidden" href="' + intent_url + '">Open Navi App</a>'
'</div>'
'<script>'
'(function(){'
f'window.location.href="{intent_url}";'
'setTimeout(function(){'
'document.getElementById("status").innerHTML="Tap below to continue back to the app";'
'document.getElementById("btn").classList.remove("hidden");'
'},1500);'
'})();'
'</script>'
'</body></html>'
)
return Response(content=html, media_type="text/html")
@router.post("/logout")
async def auth_logout(response: Response, user: Annotated[User, Depends(require_user)], request: Request) -> dict:
"""Logout current user."""
cookie_name = settings.navi_auth_cookie_name
session_id = request.cookies.get(cookie_name)
if session_id:
try:
from navi.api.deps import get_session_store
store = get_session_store()
pool = await store._get_pool()
async with pool.acquire() as conn:
await conn.execute("DELETE FROM user_auth_sessions WHERE id = $1", session_id)
except Exception:
log.warning("auth.logout_cleanup_failed", exc_info=True)
# Clear cookie
cookie_str = (
f"{cookie_name}=; "
f"Max-Age=0; "
f"HttpOnly; "
f"Path=/; "
f"SameSite={settings.navi_auth_cookie_samesite}"
)
if settings.navi_auth_cookie_secure:
cookie_str += "; Secure"
response.headers["Set-Cookie"] = cookie_str
log.info("auth.logout", user_id=user.id)
return {"ok": True}
@router.get("/me")
async def auth_me(user: Annotated[User, Depends(require_user)]) -> dict:
"""Return current authenticated user."""
profile_url = f"{settings.gnauth_base_url.rstrip('/')}{settings.gnauth_profile_path}"
return {
"id": user.id,
"email": user.email,
"display_name": user.display_name,
"username": user.username,
"first_name": user.first_name,
"last_name": user.last_name,
"phone": user.phone,
"birth_date": user.birth_date,
"country": user.country,
"city": user.city,
"locale": user.locale,
"avatar_url": user.avatar_url,
"role": user.role,
"permissions": user.permissions,
"profile_url": profile_url,
}
@router.get("/status")
async def auth_status() -> dict:
"""Return whether auth is enabled and OAuth is configured on the backend."""
return {
"enabled": settings.navi_auth_enabled,
"configured": _auth_configured() if settings.navi_auth_enabled else False,
}