Newer
Older
gnexus-creds / gnexus_creds / api.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 3 days ago 12 KB Harden API token tracking and test coverage
"""REST API routes."""

from datetime import UTC, datetime
from uuid import UUID

from fastapi import APIRouter, Depends, Query
from sqlalchemy import distinct, func, select
from sqlalchemy.orm import Session, selectinload

from gnexus_creds.auth import actor_from_request, require_admin
from gnexus_creds.db import get_db
from gnexus_creds.models import ApiToken, AuditEvent, Secret, SecretTag, User
from gnexus_creds.schemas import (
    ApiTokenCreate,
    ApiTokenCreated,
    ApiTokenRead,
    ImportPayload,
    Page,
    Scope,
    SecretCreate,
    SecretFieldIn,
    SecretRead,
    SecretReveal,
    SecretStatus,
    SecretUpdate,
    SecretVersionRead,
)
from gnexus_creds.services import (
    Actor,
    audit,
    check_actor_rate_limit,
    create_api_token,
    create_secret,
    delete_secret,
    get_version,
    list_secrets,
    list_versions,
    reveal_secret,
    serialize_secret,
    update_secret,
)

router = APIRouter(prefix="/api/v1")


def _audit_read(row: AuditEvent) -> dict:
    return {
        "id": row.id,
        "user_id": row.user_id,
        "actor_user_id": row.actor_user_id,
        "api_token_id": row.api_token_id,
        "secret_id": row.secret_id,
        "channel": row.channel,
        "action": row.action,
        "ip_address": row.ip_address,
        "user_agent": row.user_agent,
        "metadata": row.audit_metadata,
        "created_at": row.created_at,
    }


def _export_secret(row: Secret, actor: Actor, db: Session) -> dict:
    revealed = serialize_secret(row, reveal=True, db=db, user=actor.user)
    return SecretCreate(
        title=revealed.title,
        purpose=revealed.purpose,
        category=revealed.category,
        source=revealed.source,
        notes=revealed.notes,
        tags=revealed.tags,
        status=revealed.status,
        archived=revealed.archived,
        allow_ui=revealed.allow_ui,
        allow_rest_api=revealed.allow_rest_api,
        allow_mcp=revealed.allow_mcp,
        fields=[
            SecretFieldIn(
                name=field.name,
                value=field.value or "",
                encrypted=field.encrypted,
                masked=field.masked,
                position=field.position,
            )
            for field in revealed.fields
        ],
    ).model_dump(mode="json")


@router.get("/me")
async def me(actor: Actor = Depends(actor_from_request)) -> dict:
    return {
        "id": actor.user.id,
        "email": actor.user.email,
        "display_name": actor.user.display_name,
        "locale": actor.user.locale,
        "role": actor.user.system_role,
        "status": actor.user.status,
    }


@router.get("/secrets", response_model=Page)
async def secrets_list(
    q: str | None = None,
    category: str | None = None,
    status: SecretStatus | None = None,
    include_archived: bool = False,
    offset: int = Query(0, ge=0),
    limit: int = Query(50, ge=1, le=200),
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> Page:
    items, total = list_secrets(
        db,
        actor,
        q=q,
        category=category,
        status=status,
        include_archived=include_archived,
        offset=offset,
        limit=limit,
    )
    return Page(
        items=[item.model_dump() for item in items], total=total, offset=offset, limit=limit
    )


@router.post("/secrets", response_model=SecretRead)
async def secrets_create(
    payload: SecretCreate,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretRead:
    result = create_secret(db, actor, payload)
    db.commit()
    return result


@router.get("/secrets/{secret_id}", response_model=SecretRead)
async def secrets_get(
    secret_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretRead:
    from gnexus_creds.services import get_secret

    return get_secret(db, actor, secret_id)


@router.patch("/secrets/{secret_id}", response_model=SecretRead)
async def secrets_update(
    secret_id: UUID,
    payload: SecretUpdate,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretRead:
    result = update_secret(db, actor, secret_id, payload)
    db.commit()
    return result


@router.delete("/secrets/{secret_id}", status_code=204)
async def secrets_delete(
    secret_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> None:
    delete_secret(db, actor, secret_id)
    db.commit()


@router.post("/secrets/{secret_id}/reveal", response_model=SecretReveal)
async def secrets_reveal(
    secret_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretReveal:
    result = reveal_secret(db, actor, secret_id)
    db.commit()
    return result


@router.get("/secrets/{secret_id}/versions", response_model=list[SecretVersionRead])
async def versions_list(
    secret_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> list[SecretVersionRead]:
    return list_versions(db, actor, secret_id)


@router.get("/secrets/{secret_id}/versions/{version_id}", response_model=SecretVersionRead)
async def versions_get(
    secret_id: UUID,
    version_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretVersionRead:
    return get_version(db, actor, secret_id, version_id)


@router.post("/secrets/{secret_id}/versions/{version_id}/reveal", response_model=SecretReveal)
async def versions_reveal(
    secret_id: UUID,
    version_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> SecretReveal:
    result = reveal_secret(db, actor, secret_id, version_id=version_id)
    db.commit()
    return result


@router.get("/categories", response_model=list[str])
async def categories(
    db: Session = Depends(get_db), actor: Actor = Depends(actor_from_request)
) -> list[str]:
    actor.require(Scope.read)
    return list(
        db.scalars(
            select(distinct(Secret.category))
            .where(Secret.user_id == actor.user.id, Secret.category.is_not(None))
            .order_by(Secret.category)
        )
    )


@router.get("/tags", response_model=list[str])
async def tags(
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> list[str]:
    actor.require(Scope.read)
    return list(
        db.scalars(
            select(distinct(SecretTag.name))
            .where(SecretTag.user_id == actor.user.id)
            .order_by(SecretTag.name)
        )
    )


@router.get("/suggestions")
async def suggestions(
    q: str = Query("", max_length=120),
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> dict[str, list[str]]:
    actor.require(Scope.read)
    like = f"%{q.lower()}%"
    category_rows = db.scalars(
        select(distinct(Secret.category))
        .where(
            Secret.user_id == actor.user.id,
            Secret.category.is_not(None),
            func.lower(Secret.category).like(like),
        )
        .limit(20)
    )
    tag_rows = db.scalars(
        select(distinct(SecretTag.name))
        .where(SecretTag.user_id == actor.user.id, func.lower(SecretTag.name).like(like))
        .order_by(SecretTag.name)
        .limit(20)
    )
    return {"categories": list(category_rows), "tags": list(tag_rows)}


@router.get("/audit-events", response_model=Page)
async def audit_events(
    offset: int = Query(0, ge=0),
    limit: int = Query(50, ge=1, le=200),
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> Page:
    actor.require(Scope.admin)
    stmt = select(AuditEvent).where(AuditEvent.user_id == actor.user.id)
    total = db.scalar(select(func.count()).select_from(stmt.subquery())) or 0
    rows = db.scalars(stmt.order_by(AuditEvent.created_at.desc()).offset(offset).limit(limit))
    return Page(
        items=[_audit_read(row) for row in rows],
        total=total,
        offset=offset,
        limit=limit,
    )


@router.get("/secrets/{secret_id}/audit-events", response_model=Page)
async def secret_audit_events(
    secret_id: UUID,
    offset: int = Query(0, ge=0),
    limit: int = Query(50, ge=1, le=200),
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> Page:
    actor.require(Scope.admin)
    stmt = select(AuditEvent).where(
        AuditEvent.user_id == actor.user.id, AuditEvent.secret_id == secret_id
    )
    total = db.scalar(select(func.count()).select_from(stmt.subquery())) or 0
    rows = db.scalars(stmt.order_by(AuditEvent.created_at.desc()).offset(offset).limit(limit))
    return Page(
        items=[_audit_read(row) for row in rows],
        total=total,
        offset=offset,
        limit=limit,
    )


@router.get("/api-tokens", response_model=list[ApiTokenRead])
async def api_tokens_list(
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
):
    actor.require(Scope.admin)
    rows = db.scalars(
        select(ApiToken)
        .where(ApiToken.user_id == actor.user.id)
        .order_by(ApiToken.created_at.desc())
    )
    return [ApiTokenRead.model_validate(row, from_attributes=True) for row in rows]


@router.post("/api-tokens", response_model=ApiTokenCreated)
async def api_tokens_create(
    payload: ApiTokenCreate,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
):
    row, token = create_api_token(db, actor, payload)
    db.commit()
    return ApiTokenCreated(
        id=row.id,
        public_id=row.public_id,
        name=row.name,
        scopes=row.scopes,
        created_at=row.created_at,
        revoked_at=row.revoked_at,
        last_used_at=row.last_used_at,
        token=token,
    )


@router.delete("/api-tokens/{token_id}", status_code=204)
async def api_tokens_delete(
    token_id: UUID,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> None:
    actor.require(Scope.admin)
    row = db.get(ApiToken, token_id)
    if row and row.user_id == actor.user.id:
        row.revoked_at = datetime.now(UTC)
        audit(db, actor, action="api_token.revoked", metadata={"name": row.name})
    db.commit()


@router.post("/export")
async def export_data(
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> dict:
    actor.require(Scope.reveal)
    check_actor_rate_limit(db, actor, "export")
    rows = db.scalars(
        select(Secret)
        .where(Secret.user_id == actor.user.id)
        .options(selectinload(Secret.versions), selectinload(Secret.tags))
        .order_by(Secret.created_at)
    ).unique()
    exported = [_export_secret(row, actor, db) for row in rows]
    audit(db, actor, action="export.created")
    db.commit()
    return {
        "format": "gnexus-creds-export",
        "version": 1,
        "exported_at": datetime.now(UTC),
        "secrets": exported,
    }


@router.post("/import")
async def import_data(
    payload: ImportPayload,
    db: Session = Depends(get_db),
    actor: Actor = Depends(actor_from_request),
) -> dict[str, int]:
    actor.require(Scope.write)
    check_actor_rate_limit(db, actor, "import")
    if payload.format != "gnexus-creds-export" or payload.version != 1:
        from gnexus_creds.errors import AppError

        raise AppError("unsupported_import_format", "Unsupported import format.", status_code=400)
    created = 0
    for item in payload.secrets:
        create_secret(db, actor, item)
        created += 1
    db.commit()
    return {"created": created}


@router.delete("/account-data", status_code=204)
async def delete_account_data(
    db: Session = Depends(get_db), actor: Actor = Depends(actor_from_request)
) -> None:
    actor.require(Scope.admin)
    db.query(Secret).filter(Secret.user_id == actor.user.id).delete(synchronize_session=False)
    audit(db, actor, action="account_data.deleted")
    db.commit()


admin_router = APIRouter(prefix="/api/v1/admin", dependencies=[Depends(require_admin)])


@admin_router.get("/users", response_model=Page)
async def admin_users(
    offset: int = Query(0, ge=0),
    limit: int = Query(50, ge=1, le=200),
    db: Session = Depends(get_db),
) -> Page:
    stmt = select(User)
    total = db.scalar(select(func.count()).select_from(stmt.subquery())) or 0
    rows = db.scalars(stmt.order_by(User.created_at.desc()).offset(offset).limit(limit))
    return Page(
        items=[
            {
                "id": row.id,
                "email": row.email,
                "display_name": row.display_name,
                "status": row.status,
                "role": row.system_role,
                "created_at": row.created_at,
                "last_seen_at": row.last_seen_at,
            }
            for row in rows
        ],
        total=total,
        offset=offset,
        limit=limit,
    )