Newer
Older
navi-1 / tests / unit / auth / test_api_tokens.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 24 May 7 KB Apply review fixes to API token auth system
"""Unit tests for API token auth and CRUD endpoints."""

import hashlib
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch

import pytest
from fastapi import HTTPException

from navi.auth import User
from navi.auth.deps import _resolve_user_from_api_token
from navi.api.routes.api_tokens import (
    create_api_token,
    list_api_tokens,
    revoke_api_token,
)
from tests.conftest_factory import FakeConnection, FakePool


class FakeRequest:
    """Minimal stand-in for FastAPI Request or WebSocket."""

    def __init__(self, headers=None, query_params=None):
        self.headers = headers or {}
        self.query_params = query_params or {}
        self.state = MagicMock()
        self.state.user = None


class FakeSessionStore:
    """Stand-in that returns a FakePool via _get_pool()."""

    def __init__(self, conn=None):
        self._pool = FakePool(conn)

    async def _get_pool(self):
        return self._pool


def _make_api_token_conn(
    token_exists=True,
    revoked=False,
    user_exists=True,
    token_id=42,
    user_id="u1",
):
    """Build a FakeConnection with a single JOIN row for api_tokens + navi_users."""
    conn = FakeConnection()
    now = datetime.now(timezone.utc)

    if not token_exists or not user_exists:
        conn.enqueue(None)
    else:
        row = {
            "id": user_id,
            "email": "u@test.com",
            "display_name": "Test User",
            "username": "testuser",
            "first_name": "Test",
            "last_name": "User",
            "phone": None,
            "birth_date": None,
            "country": None,
            "city": None,
            "locale": None,
            "role": "user",
            "permissions": "[]",
            "token_id": token_id,
            "revoked_at": now if revoked else None,
        }
        conn.enqueue(row)

    # last_used_at update (execute)
    conn.enqueue("OK")
    return conn


@pytest.fixture(autouse=True)
def _patch_store():
    with patch("navi.api.deps.get_session_store") as mock_get_store:
        yield mock_get_store


# ── _resolve_user_from_api_token tests ──────────────────────────────────────


@pytest.mark.asyncio
async def test_resolve_user_from_header(_patch_store):
    plain = "nav_testtoken123"
    token_hash = hashlib.sha256(plain.encode()).hexdigest()
    conn = _make_api_token_conn()
    _patch_store.return_value = FakeSessionStore(conn)

    req = FakeRequest(headers={"X-Api-Token": plain})
    user = await _resolve_user_from_api_token(req)

    assert user is not None
    assert user.id == "u1"
    assert user.email == "u@test.com"
    assert user.display_name == "Test User"


@pytest.mark.asyncio
async def test_resolve_user_from_query_param(_patch_store):
    plain = "nav_testtoken456"
    token_hash = hashlib.sha256(plain.encode()).hexdigest()
    conn = _make_api_token_conn()
    _patch_store.return_value = FakeSessionStore(conn)

    req = FakeRequest(query_params={"api_token": plain})
    user = await _resolve_user_from_api_token(req)

    assert user is not None
    assert user.id == "u1"


@pytest.mark.asyncio
async def test_resolve_user_no_token_returns_none(_patch_store):
    req = FakeRequest()
    user = await _resolve_user_from_api_token(req)
    assert user is None


@pytest.mark.asyncio
async def test_resolve_user_revoked_token_returns_none(_patch_store):
    plain = "nav_revoked123"
    conn = _make_api_token_conn(revoked=True)
    _patch_store.return_value = FakeSessionStore(conn)

    req = FakeRequest(headers={"X-Api-Token": plain})
    user = await _resolve_user_from_api_token(req)
    assert user is None


@pytest.mark.asyncio
async def test_resolve_user_missing_token_row_returns_none(_patch_store):
    plain = "nav_missing123"
    conn = _make_api_token_conn(token_exists=False)
    _patch_store.return_value = FakeSessionStore(conn)

    req = FakeRequest(headers={"X-Api-Token": plain})
    user = await _resolve_user_from_api_token(req)
    assert user is None


@pytest.mark.asyncio
async def test_resolve_user_orphan_user_returns_none(_patch_store):
    plain = "nav_orphan123"
    conn = _make_api_token_conn(user_exists=False)
    _patch_store.return_value = FakeSessionStore(conn)

    req = FakeRequest(headers={"X-Api-Token": plain})
    user = await _resolve_user_from_api_token(req)
    assert user is None


# ── Endpoint tests ──────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_create_api_token(_patch_store):
    conn = FakeConnection()
    conn.enqueue({"id": 1, "created_at": datetime.now(timezone.utc)})
    _patch_store.return_value = FakeSessionStore(conn)

    user = User(id="u1", email="u@test.com", display_name="Test")
    payload = MagicMock()
    payload.name = "My Device"

    result = await create_api_token(payload, user)

    assert result.id == 1
    assert result.name == "My Device"
    assert result.token.startswith("nav_")
    assert result.token_prefix.startswith("nav_")
    assert len(result.token) > 10  # plain token is long


@pytest.mark.asyncio
async def test_list_api_tokens(_patch_store):
    now = datetime.now(timezone.utc)
    conn = FakeConnection()
    conn.enqueue([
        {
            "id": 1,
            "name": "Device A",
            "token_prefix": "nav_abc…",
            "created_at": now,
            "last_used_at": now,
        },
        {
            "id": 2,
            "name": "Device B",
            "token_prefix": "nav_def…",
            "created_at": now,
            "last_used_at": None,
        },
    ])
    _patch_store.return_value = FakeSessionStore(conn)

    user = User(id="u1", email="u@test.com", display_name="Test")
    result = await list_api_tokens(user)

    assert len(result["items"]) == 2
    assert result["items"][0].name == "Device A"
    assert result["items"][1].name == "Device B"
    # Ensure plain token/token_hash are NOT exposed
    assert not hasattr(result["items"][0], "token")
    assert not hasattr(result["items"][0], "token_hash")


@pytest.mark.asyncio
async def test_revoke_api_token_success(_patch_store):
    conn = FakeConnection()
    conn.enqueue({"id": 1})
    _patch_store.return_value = FakeSessionStore(conn)

    user = User(id="u1", email="u@test.com", display_name="Test")
    result = await revoke_api_token(1, user)

    assert result is None
    # Verify the UPDATE touched the right row
    update_calls = [c for c in conn.calls if c[0] == "fetchrow" and "UPDATE" in c[1]]
    assert len(update_calls) == 1
    assert update_calls[0][2][2] == "u1"  # user_id param


@pytest.mark.asyncio
async def test_revoke_api_token_not_found(_patch_store):
    conn = FakeConnection()
    conn.enqueue(None)
    _patch_store.return_value = FakeSessionStore(conn)

    user = User(id="u1", email="u@test.com", display_name="Test")
    with pytest.raises(HTTPException) as exc_info:
        await revoke_api_token(99, user)
    assert exc_info.value.status_code == 404