Newer
Older
navi-1 / navi / api / routes / api_tokens.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 24 May 3 KB Apply review fixes to API token auth system
"""API token management endpoints for headless client authentication.

SECURITY NOTES:
- Tokens are stored as SHA-256 hashes; plain text is shown only once on creation.
- No rate limiting yet: a user could create many tokens quickly.
- No scopes or expiration for MVP; all tokens grant full user access.
"""

import hashlib
import secrets
from datetime import datetime, timezone
from typing import Annotated

import structlog
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel

from navi.api.deps import require_user
from navi.auth import User

log = structlog.get_logger()
router = APIRouter(prefix="/api-tokens", tags=["api_tokens"])


class CreateApiTokenRequest(BaseModel):
    name: str


class ApiTokenItem(BaseModel):
    id: int
    name: str
    token_prefix: str
    created_at: datetime
    last_used_at: datetime | None = None


class CreateApiTokenResponse(ApiTokenItem):
    token: str


@router.post("")
async def create_api_token(
    payload: CreateApiTokenRequest,
    user: Annotated[User, Depends(require_user)],
) -> CreateApiTokenResponse:
    """Create a new API token for the current user.

    The plain token is returned **only once** — it cannot be retrieved later.
    """
    plain = "nav_" + secrets.token_urlsafe(32)
    token_hash = hashlib.sha256(plain.encode()).hexdigest()
    token_prefix = plain[:12] + "…"
    now = datetime.now(timezone.utc)

    from navi.api.deps import get_session_store
    store = get_session_store()
    pool = await store._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "INSERT INTO api_tokens (user_id, name, token_hash, token_prefix, created_at) "
            "VALUES ($1, $2, $3, $4, $5) RETURNING id, created_at",
            user.id,
            payload.name,
            token_hash,
            token_prefix,
            now,
        )

    log.info("api_token.created", user_id=user.id, token_id=row["id"])
    return CreateApiTokenResponse(
        id=row["id"],
        name=payload.name,
        token=plain,
        token_prefix=token_prefix,
        created_at=row["created_at"],
        last_used_at=None,
    )


@router.get("")
async def list_api_tokens(
    user: Annotated[User, Depends(require_user)],
) -> dict[str, list[ApiTokenItem]]:
    """List active (non-revoked) API tokens for the current user."""
    from navi.api.deps import get_session_store
    store = get_session_store()
    pool = await store._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id, name, token_prefix, created_at, last_used_at "
            "FROM api_tokens WHERE user_id = $1 AND revoked_at IS NULL "
            "ORDER BY created_at DESC",
            user.id,
        )

    items = [
        ApiTokenItem(
            id=row["id"],
            name=row["name"],
            token_prefix=row["token_prefix"],
            created_at=row["created_at"],
            last_used_at=row["last_used_at"],
        )
        for row in rows
    ]
    return {"items": items}


@router.delete("/{token_id}", status_code=204)
async def revoke_api_token(
    token_id: int,
    user: Annotated[User, Depends(require_user)],
) -> None:
    """Revoke (soft-delete) an API token belonging to the current user."""
    from navi.api.deps import get_session_store
    store = get_session_store()
    pool = await store._get_pool()
    now = datetime.now(timezone.utc)

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            "UPDATE api_tokens SET revoked_at = $1 WHERE id = $2 AND user_id = $3 "
            "RETURNING id",
            now,
            token_id,
            user.id,
        )

    if row is None:
        raise HTTPException(status_code=404, detail="Token not found")

    log.info("api_token.revoked", user_id=user.id, token_id=token_id)