import pytest
from httpx import ASGITransport, AsyncClient
from gnexus_creds import crypto
from gnexus_creds.models import ApiToken, AuditEvent
from gnexus_creds.schemas import SecretCreate, SecretFieldIn
from gnexus_creds.services import Actor, create_secret
@pytest.mark.anyio
async def test_rest_create_list_reveal(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/api/v1/secrets",
json={
"title": "Server",
"purpose": "db01",
"category": "infra",
"tags": ["ssh"],
"fields": [
{"name": "username", "value": "deploy", "encrypted": False, "position": 1},
{
"name": "password",
"value": "pass123",
"encrypted": True,
"masked": True,
"position": 2,
},
],
},
)
assert response.status_code == 200, response.text
secret_id = response.json()["id"]
response = await client.get("/api/v1/secrets?q=deploy")
assert response.status_code == 200
assert response.json()["total"] == 1
response = await client.get("/api/v1/secrets?q=pass123")
assert response.status_code == 200
assert response.json()["total"] == 0
response = await client.get("/api/v1/secrets?q=password")
assert response.status_code == 200
assert response.json()["total"] == 1
response = await client.get("/api/v1/secrets?q=deploy")
fields = response.json()["items"][0]["fields"]
assert {field["name"]: field["value"] for field in fields}["username"] == "deploy"
assert {field["name"]: field["value"] for field in fields}["password"] is None
response = await client.post(f"/api/v1/secrets/{secret_id}/reveal")
assert response.status_code == 200
assert {field["name"]: field["value"] for field in response.json()["fields"]}[
"password"
] == "pass123"
@pytest.mark.anyio
async def test_export_import_round_trip(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Card PIN",
"purpose": "bank card",
"category": "finance",
"tags": ["card"],
"fields": [
{"name": "card", "value": "1111", "encrypted": False},
{"name": "pin", "value": "1234", "encrypted": True, "masked": True},
],
},
)
assert create_response.status_code == 200
export_response = await client.post("/api/v1/export")
assert export_response.status_code == 200
payload = export_response.json()
assert payload["format"] == "gnexus-creds-export"
assert payload["secrets"][0]["fields"][1]["value"] == "1234"
delete_response = await client.delete("/api/v1/account-data")
assert delete_response.status_code == 204
list_response = await client.get("/api/v1/secrets")
assert list_response.json()["total"] == 0
import_response = await client.post("/api/v1/import", json=payload)
assert import_response.status_code == 200
assert import_response.json()["created"] == 1
reveal_id = (await client.get("/api/v1/secrets")).json()["items"][0]["id"]
reveal_response = await client.post(f"/api/v1/secrets/{reveal_id}/reveal")
assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
"pin"
] == "1234"
@pytest.mark.anyio
async def test_rest_get_version_masks_encrypted_values(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Versioned",
"fields": [
{"name": "username", "value": "version-user", "encrypted": False},
{"name": "password", "value": "first-pass", "encrypted": True, "masked": True},
],
},
)
secret_id = create_response.json()["id"]
update_response = await client.patch(
f"/api/v1/secrets/{secret_id}",
json={
"fields": [
{"name": "username", "value": "version-user", "encrypted": False},
{"name": "password", "value": "second-pass", "encrypted": True, "masked": True},
]
},
)
assert update_response.status_code == 200
versions_response = await client.get(f"/api/v1/secrets/{secret_id}/versions")
version_id = versions_response.json()[0]["id"]
version_response = await client.get(f"/api/v1/secrets/{secret_id}/versions/{version_id}")
assert version_response.status_code == 200
fields = {field["name"]: field["value"] for field in version_response.json()["fields"]}
assert fields["username"] == "version-user"
assert fields["password"] is None
@pytest.mark.anyio
async def test_rest_search_total_deduplicates_versions_and_tags(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Deploy target",
"tags": ["deploy", "deploy-alt"],
"fields": [{"name": "username", "value": "deploy", "encrypted": False}],
},
)
secret_id = create_response.json()["id"]
await client.patch(
f"/api/v1/secrets/{secret_id}",
json={"fields": [{"name": "username", "value": "deploy-v2", "encrypted": False}]},
)
response = await client.get("/api/v1/secrets", params={"q": "deploy"})
assert response.status_code == 200
assert response.json()["total"] == 1
assert len(response.json()["items"]) == 1
@pytest.mark.anyio
async def test_rest_taxonomy_audit_and_token_management(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Taxonomy",
"category": "infra",
"tags": ["ssh", "server"],
"fields": [{"name": "username", "value": "ops", "encrypted": False}],
},
)
secret_id = create_response.json()["id"]
await client.post(f"/api/v1/secrets/{secret_id}/reveal")
categories_response = await client.get("/api/v1/categories")
tags_response = await client.get("/api/v1/tags")
suggestions_response = await client.get("/api/v1/suggestions", params={"q": "s"})
audit_response = await client.get("/api/v1/audit-events")
secret_audit_response = await client.get(f"/api/v1/secrets/{secret_id}/audit-events")
token_create_response = await client.post(
"/api/v1/api-tokens",
json={"name": "REST client", "scopes": ["read", "reveal"]},
)
token_id = token_create_response.json()["id"]
token_list_response = await client.get("/api/v1/api-tokens")
revoke_response = await client.delete(f"/api/v1/api-tokens/{token_id}")
assert categories_response.status_code == 200
assert categories_response.json() == ["infra"]
assert tags_response.status_code == 200
assert tags_response.json() == ["server", "ssh"]
assert suggestions_response.status_code == 200
assert suggestions_response.json()["tags"] == ["server", "ssh"]
assert audit_response.status_code == 200
assert audit_response.json()["total"] >= 2
assert secret_audit_response.status_code == 200
assert {event["action"] for event in secret_audit_response.json()["items"]} >= {
"secret.created",
"secret.revealed",
}
assert token_create_response.status_code == 200
assert token_create_response.json()["token"].startswith("gcr_")
assert token_list_response.status_code == 200
assert token_list_response.json()[0]["name"] == "REST client"
assert revoke_response.status_code == 204
@pytest.mark.anyio
async def test_failed_access_is_aggregated(auth_app, db_session):
async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
assert (await client.get("/api/v1/me")).status_code == 401
assert (await client.get("/api/v1/me")).status_code == 401
events = db_session.query(AuditEvent).filter(AuditEvent.action == "access.failed").all()
assert len(events) == 1
@pytest.mark.anyio
async def test_api_token_scopes(auth_app, db_session, user):
created = create_secret(
db_session,
Actor(user=user, channel="ui"),
SecretCreate(
title="Token scoped",
fields=[SecretFieldIn(name="password", value="secret", encrypted=True)],
),
)
read_token = "gcr_read_secret"
reveal_token = "gcr_reveal_secret"
read_token_row = ApiToken(
user_id=user.id,
public_id="read",
name="read",
token_hash=crypto.token_hash(read_token),
scopes=["read"],
)
db_session.add_all(
[
read_token_row,
ApiToken(
user_id=user.id,
public_id="reveal",
name="reveal",
token_hash=crypto.token_hash(reveal_token),
scopes=["read", "reveal"],
),
]
)
db_session.commit()
async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
list_response = await client.get(
"/api/v1/secrets", headers={"Authorization": f"Bearer {read_token}"}
)
assert list_response.status_code == 200
assert list_response.json()["total"] == 1
db_session.expire_all()
assert db_session.get(ApiToken, read_token_row.id).last_used_at is not None
denied_response = await client.post(
f"/api/v1/secrets/{created.id}/reveal",
headers={"Authorization": f"Bearer {read_token}"},
)
assert denied_response.status_code == 403
reveal_response = await client.post(
f"/api/v1/secrets/{created.id}/reveal",
headers={"Authorization": f"Bearer {reveal_token}"},
)
assert reveal_response.status_code == 200
assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
"password"
] == "secret"
@pytest.mark.anyio
async def test_admin_users_requires_admin_role(app, actor):
actor.user.system_role = "user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/api/v1/admin/users")
assert response.status_code == 403
@pytest.mark.anyio
async def test_admin_users_lists_users_for_admin(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/api/v1/admin/users")
assert response.status_code == 200
assert response.json()["total"] == 1
assert response.json()["items"][0]["email"] == "user@example.test"