Newer
Older
gnexus-creds / gnexus_creds / auth.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 4 days ago 2 KB Harden export import auth and MCP access
"""Authentication dependencies."""

from fastapi import Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy.orm import Session

from gnexus_creds.config import get_settings
from gnexus_creds.db import get_db
from gnexus_creds.errors import AppError
from gnexus_creds.models import User
from gnexus_creds.schemas import Scope
from gnexus_creds.services import (
    Actor,
    authenticate_api_token,
    get_session_user,
    log_failed_access_once,
)

bearer = HTTPBearer(auto_error=False)


async def actor_from_request(
    request: Request,
    db: Session = Depends(get_db),
    credentials: HTTPAuthorizationCredentials | None = Depends(bearer),
) -> Actor:
    actor: Actor | None = None
    if credentials:
        actor = authenticate_api_token(db, credentials.credentials)
        if actor:
            actor.ip_address = request.client.host if request.client else None
            actor.user_agent = request.headers.get("user-agent")
            return actor
    session_id = request.cookies.get(get_settings().session_cookie_name)
    user = get_session_user(db, session_id)
    if user:
        return Actor(
            user=user,
            channel="ui",
            ip_address=request.client.host if request.client else None,
            user_agent=request.headers.get("user-agent"),
        )
    failed_key = "anonymous"
    if credentials:
        failed_key = f"bearer:{credentials.credentials[:12]}"
    elif session_id:
        failed_key = f"session:{session_id[:12]}"
    log_failed_access_once(
        db,
        key=failed_key,
        channel="rest",
        ip_address=request.client.host if request.client else None,
        user_agent=request.headers.get("user-agent"),
    )
    db.commit()
    raise AppError("unauthorized", "Authentication required.", status_code=401)


async def require_admin(actor: Actor = Depends(actor_from_request)) -> Actor:
    if actor.user.system_role != "admin":
        actor.require(Scope.admin)
    return actor


def require_enabled_user(user: User) -> None:
    if user.status == "disabled":
        raise AppError("user_disabled", "User is disabled.", status_code=403)