Newer
Older
gnexus-creds / tests / test_api.py
import pytest
from httpx import ASGITransport, AsyncClient

from gnexus_creds import crypto
from gnexus_creds.models import ApiToken, AuditEvent
from gnexus_creds.schemas import SecretCreate, SecretFieldIn
from gnexus_creds.services import Actor, create_secret


@pytest.mark.anyio
async def test_rest_create_list_reveal(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        response = await client.post(
            "/api/v1/secrets",
            json={
                "title": "Server",
                "purpose": "db01",
                "category": "infra",
                "tags": ["ssh"],
                "fields": [
                    {"name": "username", "value": "deploy", "encrypted": False, "position": 1},
                    {
                        "name": "password",
                        "value": "pass123",
                        "encrypted": True,
                        "masked": True,
                        "position": 2,
                    },
                ],
            },
        )
        assert response.status_code == 200, response.text
        secret_id = response.json()["id"]

        response = await client.get("/api/v1/secrets?q=deploy")
        assert response.status_code == 200
        assert response.json()["total"] == 1
        response = await client.get("/api/v1/secrets?q=pass123")
        assert response.status_code == 200
        assert response.json()["total"] == 0
        response = await client.get("/api/v1/secrets?q=password")
        assert response.status_code == 200
        assert response.json()["total"] == 1
        response = await client.get("/api/v1/secrets?q=deploy")
        fields = response.json()["items"][0]["fields"]
        assert {field["name"]: field["value"] for field in fields}["username"] == "deploy"
        assert {field["name"]: field["value"] for field in fields}["password"] is None

        response = await client.post(f"/api/v1/secrets/{secret_id}/reveal")
        assert response.status_code == 200
        assert {field["name"]: field["value"] for field in response.json()["fields"]}[
            "password"
        ] == "pass123"


@pytest.mark.anyio
async def test_export_import_round_trip(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        create_response = await client.post(
            "/api/v1/secrets",
            json={
                "title": "Card PIN",
                "purpose": "bank card",
                "category": "finance",
                "tags": ["card"],
                "fields": [
                    {"name": "card", "value": "1111", "encrypted": False},
                    {"name": "pin", "value": "1234", "encrypted": True, "masked": True},
                ],
            },
        )
        assert create_response.status_code == 200

        export_response = await client.post("/api/v1/export")
        assert export_response.status_code == 200
        payload = export_response.json()
        assert payload["format"] == "gnexus-creds-export"
        assert payload["secrets"][0]["fields"][1]["value"] == "1234"

        delete_response = await client.delete("/api/v1/account-data")
        assert delete_response.status_code == 204
        list_response = await client.get("/api/v1/secrets")
        assert list_response.json()["total"] == 0

        import_response = await client.post("/api/v1/import", json=payload)
        assert import_response.status_code == 200
        assert import_response.json()["created"] == 1
        reveal_id = (await client.get("/api/v1/secrets")).json()["items"][0]["id"]
        reveal_response = await client.post(f"/api/v1/secrets/{reveal_id}/reveal")
        assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
            "pin"
        ] == "1234"


@pytest.mark.anyio
async def test_rest_get_version_masks_encrypted_values(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        create_response = await client.post(
            "/api/v1/secrets",
            json={
                "title": "Versioned",
                "fields": [
                    {"name": "username", "value": "version-user", "encrypted": False},
                    {"name": "password", "value": "first-pass", "encrypted": True, "masked": True},
                ],
            },
        )
        secret_id = create_response.json()["id"]

        update_response = await client.patch(
            f"/api/v1/secrets/{secret_id}",
            json={
                "fields": [
                    {"name": "username", "value": "version-user", "encrypted": False},
                    {"name": "password", "value": "second-pass", "encrypted": True, "masked": True},
                ]
            },
        )
        assert update_response.status_code == 200

        versions_response = await client.get(f"/api/v1/secrets/{secret_id}/versions")
        version_id = versions_response.json()[0]["id"]
        version_response = await client.get(f"/api/v1/secrets/{secret_id}/versions/{version_id}")

    assert version_response.status_code == 200
    fields = {field["name"]: field["value"] for field in version_response.json()["fields"]}
    assert fields["username"] == "version-user"
    assert fields["password"] is None


@pytest.mark.anyio
async def test_rest_search_total_deduplicates_versions_and_tags(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        create_response = await client.post(
            "/api/v1/secrets",
            json={
                "title": "Deploy target",
                "tags": ["deploy", "deploy-alt"],
                "fields": [{"name": "username", "value": "deploy", "encrypted": False}],
            },
        )
        secret_id = create_response.json()["id"]
        await client.patch(
            f"/api/v1/secrets/{secret_id}",
            json={"fields": [{"name": "username", "value": "deploy-v2", "encrypted": False}]},
        )

        response = await client.get("/api/v1/secrets", params={"q": "deploy"})

    assert response.status_code == 200
    assert response.json()["total"] == 1
    assert len(response.json()["items"]) == 1


@pytest.mark.anyio
async def test_rest_taxonomy_audit_and_token_management(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        create_response = await client.post(
            "/api/v1/secrets",
            json={
                "title": "Taxonomy",
                "category": "infra",
                "tags": ["ssh", "server"],
                "fields": [{"name": "username", "value": "ops", "encrypted": False}],
            },
        )
        secret_id = create_response.json()["id"]
        await client.post(f"/api/v1/secrets/{secret_id}/reveal")

        categories_response = await client.get("/api/v1/categories")
        tags_response = await client.get("/api/v1/tags")
        suggestions_response = await client.get("/api/v1/suggestions", params={"q": "s"})
        audit_response = await client.get("/api/v1/audit-events")
        secret_audit_response = await client.get(f"/api/v1/secrets/{secret_id}/audit-events")

        token_create_response = await client.post(
            "/api/v1/api-tokens",
            json={"name": "REST client", "scopes": ["read", "reveal"]},
        )
        token_id = token_create_response.json()["id"]
        token_list_response = await client.get("/api/v1/api-tokens")
        revoke_response = await client.patch(f"/api/v1/api-tokens/{token_id}/revoke")

    assert categories_response.status_code == 200
    assert categories_response.json() == ["infra"]
    assert tags_response.status_code == 200
    assert tags_response.json() == ["server", "ssh"]
    assert suggestions_response.status_code == 200
    assert suggestions_response.json()["tags"] == ["server", "ssh"]
    assert audit_response.status_code == 200
    assert audit_response.json()["total"] >= 2
    assert secret_audit_response.status_code == 200
    assert {event["action"] for event in secret_audit_response.json()["items"]} >= {
        "secret.created",
        "secret.revealed",
    }
    assert token_create_response.status_code == 200
    assert token_create_response.json()["token"].startswith("gcr_")
    assert token_list_response.status_code == 200
    assert token_list_response.json()[0]["name"] == "REST client"
    assert revoke_response.status_code == 204


@pytest.mark.anyio
async def test_failed_access_is_aggregated(auth_app, db_session):
    async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
        assert (await client.get("/api/v1/me")).status_code == 401
        assert (await client.get("/api/v1/me")).status_code == 401

    events = db_session.query(AuditEvent).filter(AuditEvent.action == "access.failed").all()
    assert len(events) == 1


@pytest.mark.anyio
async def test_api_token_scopes(auth_app, db_session, user):
    created = create_secret(
        db_session,
        Actor(user=user, channel="ui"),
        SecretCreate(
            title="Token scoped",
            fields=[SecretFieldIn(name="password", value="secret", encrypted=True)],
        ),
    )
    read_token = "gcr_read_secret"
    reveal_token = "gcr_reveal_secret"
    read_token_row = ApiToken(
        user_id=user.id,
        public_id="read",
        name="read",
        token_hash=crypto.token_hash(read_token),
        scopes=["read"],
    )
    db_session.add_all(
        [
            read_token_row,
            ApiToken(
                user_id=user.id,
                public_id="reveal",
                name="reveal",
                token_hash=crypto.token_hash(reveal_token),
                scopes=["read", "reveal"],
            ),
        ]
    )
    db_session.commit()

    async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
        list_response = await client.get(
            "/api/v1/secrets", headers={"Authorization": f"Bearer {read_token}"}
        )
        assert list_response.status_code == 200
        assert list_response.json()["total"] == 1
        db_session.expire_all()
        assert db_session.get(ApiToken, read_token_row.id).last_used_at is not None

        denied_response = await client.post(
            f"/api/v1/secrets/{created.id}/reveal",
            headers={"Authorization": f"Bearer {read_token}"},
        )
        assert denied_response.status_code == 403

        reveal_response = await client.post(
            f"/api/v1/secrets/{created.id}/reveal",
            headers={"Authorization": f"Bearer {reveal_token}"},
        )
        assert reveal_response.status_code == 200
        assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
            "password"
        ] == "secret"


@pytest.mark.anyio
async def test_admin_users_requires_admin_role(app, actor):
    actor.user.system_role = "user"
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        response = await client.get("/api/v1/admin/users")
    assert response.status_code == 403


@pytest.mark.anyio
async def test_admin_users_lists_users_for_admin(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        response = await client.get("/api/v1/admin/users")
    assert response.status_code == 200
    assert response.json()["total"] == 1
    assert response.json()["items"][0]["email"] == "user@example.test"


@pytest.mark.anyio
async def test_stats_returns_user_counts(app):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        await client.post(
            "/api/v1/secrets",
            json={
                "title": "Stat Test 1",
                "fields": [{"name": "key", "value": "val", "encrypted": False}],
            },
        )
        await client.post(
            "/api/v1/secrets",
            json={
                "title": "Stat Test 2",
                "allow_mcp": True,
                "fields": [{"name": "key", "value": "val", "encrypted": False}],
            },
        )
        stats_response = await client.get("/api/v1/stats")
    assert stats_response.status_code == 200
    data = stats_response.json()
    assert data["total_secrets"] == 2
    assert data["active_secrets"] == 2
    assert data["mcp_enabled_secrets"] == 1