import pytest
from httpx import ASGITransport, AsyncClient
from gnexus_creds import crypto
from gnexus_creds.models import ApiToken, AuditEvent
from gnexus_creds.schemas import SecretCreate, SecretFieldIn
from gnexus_creds.services import Actor, create_secret
@pytest.mark.anyio
async def test_rest_create_list_reveal(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/api/v1/secrets",
json={
"title": "Server",
"purpose": "db01",
"category": "infra",
"tags": ["ssh"],
"fields": [
{"name": "username", "value": "deploy", "encrypted": False, "position": 1},
{
"name": "password",
"value": "pass123",
"encrypted": True,
"masked": True,
"position": 2,
},
],
},
)
assert response.status_code == 200, response.text
secret_id = response.json()["id"]
response = await client.get("/api/v1/secrets?q=deploy")
assert response.status_code == 200
assert response.json()["total"] == 1
response = await client.get("/api/v1/secrets?q=pass123")
assert response.status_code == 200
assert response.json()["total"] == 0
response = await client.get("/api/v1/secrets?q=password")
assert response.status_code == 200
assert response.json()["total"] == 1
response = await client.get("/api/v1/secrets?q=deploy")
fields = response.json()["items"][0]["fields"]
assert {field["name"]: field["value"] for field in fields}["username"] == "deploy"
assert {field["name"]: field["value"] for field in fields}["password"] is None
response = await client.post(f"/api/v1/secrets/{secret_id}/reveal")
assert response.status_code == 200
assert {field["name"]: field["value"] for field in response.json()["fields"]}[
"password"
] == "pass123"
@pytest.mark.anyio
async def test_export_import_round_trip(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Card PIN",
"purpose": "bank card",
"category": "finance",
"tags": ["card"],
"fields": [
{"name": "card", "value": "1111", "encrypted": False},
{"name": "pin", "value": "1234", "encrypted": True, "masked": True},
],
},
)
assert create_response.status_code == 200
export_response = await client.post("/api/v1/export")
assert export_response.status_code == 200
payload = export_response.json()
assert payload["format"] == "gnexus-creds-export"
assert payload["secrets"][0]["fields"][1]["value"] == "1234"
delete_response = await client.delete("/api/v1/account-data")
assert delete_response.status_code == 204
list_response = await client.get("/api/v1/secrets")
assert list_response.json()["total"] == 0
import_response = await client.post("/api/v1/import", json=payload)
assert import_response.status_code == 200
assert import_response.json()["created"] == 1
reveal_id = (await client.get("/api/v1/secrets")).json()["items"][0]["id"]
reveal_response = await client.post(f"/api/v1/secrets/{reveal_id}/reveal")
assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
"pin"
] == "1234"
@pytest.mark.anyio
async def test_rest_get_version_masks_encrypted_values(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
create_response = await client.post(
"/api/v1/secrets",
json={
"title": "Versioned",
"fields": [
{"name": "username", "value": "version-user", "encrypted": False},
{"name": "password", "value": "first-pass", "encrypted": True, "masked": True},
],
},
)
secret_id = create_response.json()["id"]
update_response = await client.patch(
f"/api/v1/secrets/{secret_id}",
json={
"fields": [
{"name": "username", "value": "version-user", "encrypted": False},
{"name": "password", "value": "second-pass", "encrypted": True, "masked": True},
]
},
)
assert update_response.status_code == 200
versions_response = await client.get(f"/api/v1/secrets/{secret_id}/versions")
version_id = versions_response.json()[0]["id"]
version_response = await client.get(f"/api/v1/secrets/{secret_id}/versions/{version_id}")
assert version_response.status_code == 200
fields = {field["name"]: field["value"] for field in version_response.json()["fields"]}
assert fields["username"] == "version-user"
assert fields["password"] is None
@pytest.mark.anyio
async def test_failed_access_is_aggregated(auth_app, db_session):
async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
assert (await client.get("/api/v1/me")).status_code == 401
assert (await client.get("/api/v1/me")).status_code == 401
events = db_session.query(AuditEvent).filter(AuditEvent.action == "access.failed").all()
assert len(events) == 1
@pytest.mark.anyio
async def test_api_token_scopes(auth_app, db_session, user):
created = create_secret(
db_session,
Actor(user=user, channel="ui"),
SecretCreate(
title="Token scoped",
fields=[SecretFieldIn(name="password", value="secret", encrypted=True)],
),
)
read_token = "gcr_read_secret"
reveal_token = "gcr_reveal_secret"
db_session.add_all(
[
ApiToken(
user_id=user.id,
public_id="read",
name="read",
token_hash=crypto.token_hash(read_token),
scopes=["read"],
),
ApiToken(
user_id=user.id,
public_id="reveal",
name="reveal",
token_hash=crypto.token_hash(reveal_token),
scopes=["read", "reveal"],
),
]
)
db_session.commit()
async with AsyncClient(transport=ASGITransport(app=auth_app), base_url="http://test") as client:
list_response = await client.get(
"/api/v1/secrets", headers={"Authorization": f"Bearer {read_token}"}
)
assert list_response.status_code == 200
assert list_response.json()["total"] == 1
denied_response = await client.post(
f"/api/v1/secrets/{created.id}/reveal",
headers={"Authorization": f"Bearer {read_token}"},
)
assert denied_response.status_code == 403
reveal_response = await client.post(
f"/api/v1/secrets/{created.id}/reveal",
headers={"Authorization": f"Bearer {reveal_token}"},
)
assert reveal_response.status_code == 200
assert {field["name"]: field["value"] for field in reveal_response.json()["fields"]}[
"password"
] == "secret"
@pytest.mark.anyio
async def test_admin_users_requires_admin_role(app, actor):
actor.user.system_role = "user"
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/api/v1/admin/users")
assert response.status_code == 403
@pytest.mark.anyio
async def test_admin_users_lists_users_for_admin(app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/api/v1/admin/users")
assert response.status_code == 200
assert response.json()["total"] == 1
assert response.json()["items"][0]["email"] == "user@example.test"