import pytest
from httpx import ASGITransport, AsyncClient
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from gnexus_creds.models import ApiToken, AuditEvent
from gnexus_creds.schemas import ApiTokenCreate, Scope, SecretCreate, SecretFieldIn
from gnexus_creds.services import Actor, create_api_token, create_secret
@pytest.mark.anyio
async def test_mcp_requires_mcp_scope(app, actor):
actor.channel = "rest"
actor.api_token = ApiToken(
user_id=actor.user.id,
public_id="pub",
name="test",
token_hash="hash",
scopes=["read"],
)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post("/mcp/tools/search_secrets", json={"arguments": {}})
assert response.status_code == 403
@pytest.mark.anyio
async def test_mcp_update_requires_secret_allow_mcp(app, db_session, actor):
actor.channel = "mcp"
actor.api_token = ApiToken(
user_id=actor.user.id,
public_id="mcp",
name="mcp",
token_hash="hash",
scopes=["mcp", "read", "write"],
)
secret = create_secret(
db_session,
Actor(user=actor.user, channel="ui"),
SecretCreate(
title="UI only",
allow_mcp=False,
fields=[SecretFieldIn(name="username", value="demo", encrypted=False)],
),
)
db_session.commit()
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/mcp/tools/update_secret",
json={"arguments": {"secret_id": str(secret.id), "title": "Changed"}},
)
assert response.status_code == 404
@pytest.mark.anyio
async def test_mcp_search_excludes_archived_and_reveal_audits(app, db_session, actor):
actor.channel = "mcp"
actor.api_token = ApiToken(
user_id=actor.user.id,
public_id="mcp-full",
name="mcp-full",
token_hash="hash",
scopes=["mcp", "read", "reveal"],
)
visible = create_secret(
db_session,
Actor(user=actor.user, channel="ui"),
SecretCreate(
title="Visible MCP",
allow_mcp=True,
fields=[SecretFieldIn(name="password", value="visible", encrypted=True)],
),
)
create_secret(
db_session,
Actor(user=actor.user, channel="ui"),
SecretCreate(
title="Archived MCP",
archived=True,
allow_mcp=True,
fields=[SecretFieldIn(name="password", value="archived", encrypted=True)],
),
)
db_session.commit()
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
search_response = await client.post(
"/mcp/tools/search_secrets",
json={"arguments": {"q": "MCP"}},
)
reveal_response = await client.post(
"/mcp/tools/reveal_secret",
json={"arguments": {"secret_id": str(visible.id)}},
)
assert search_response.status_code == 200
assert search_response.json()["total"] == 1
assert search_response.json()["items"][0]["title"] == "Visible MCP"
assert reveal_response.status_code == 200
event = (
db_session.query(AuditEvent)
.filter(AuditEvent.action == "secret.revealed", AuditEvent.secret_id == visible.id)
.one()
)
assert event.channel == "mcp"
@pytest.mark.anyio
async def test_mcp_http_adapter_tool_variants(app, db_session, actor):
actor.channel = "mcp"
actor.api_token = ApiToken(
user_id=actor.user.id,
public_id="mcp-write",
name="mcp-write",
token_hash="hash",
scopes=["mcp", "read", "reveal", "write"],
)
secret = create_secret(
db_session,
Actor(user=actor.user, channel="ui"),
SecretCreate(
title="MCP variants",
allow_mcp=True,
fields=[SecretFieldIn(name="username", value="variant", encrypted=False)],
),
)
db_session.commit()
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
sse_response = await client.get("/mcp/sse")
get_response = await client.post(
"/mcp/tools/get_secret",
json={"arguments": {"secret_id": str(secret.id)}},
)
create_response = await client.post(
"/mcp/tools/create_secret",
json={
"arguments": {
"title": "MCP adapter created",
"allow_mcp": True,
"fields": [{"name": "token", "value": "adapter", "encrypted": True}],
}
},
)
status_response = await client.post(
"/mcp/tools/set_secret_status",
json={"arguments": {"secret_id": str(secret.id), "status": "outdated"}},
)
archive_response = await client.post(
"/mcp/tools/archive_secret",
json={"arguments": {"secret_id": str(secret.id)}},
)
missing_response = await client.post(
"/mcp/tools/nope",
json={"arguments": {}},
)
assert sse_response.status_code == 200
assert "search_secrets" in sse_response.text
assert get_response.status_code == 200
assert get_response.json()["title"] == "MCP variants"
assert create_response.status_code == 200
assert create_response.json()["title"] == "MCP adapter created"
assert status_response.status_code == 200
assert status_response.json()["status"] == "outdated"
assert archive_response.status_code == 200
assert archive_response.json()["archived"] is True
assert missing_response.status_code == 404
@pytest.mark.anyio
async def test_mcp_protocol_streamable_http_tools(session_factory, db_session, user, monkeypatch):
monkeypatch.setattr("gnexus_creds.mcp_protocol.SessionLocal", session_factory)
api_token_row, token = create_api_token(
db_session,
Actor(user=user, channel="ui"),
ApiTokenCreate(scopes=[Scope.read, Scope.reveal, Scope.write, Scope.mcp], name="MCP SDK"),
)
secret = create_secret(
db_session,
Actor(user=user, channel="ui"),
SecretCreate(
title="Protocol secret",
allow_mcp=True,
tags=["protocol"],
fields=[
SecretFieldIn(name="username", value="protocol-user", encrypted=False),
SecretFieldIn(name="password", value="protocol-pass", encrypted=True, masked=True),
],
),
)
db_session.commit()
assert api_token_row.public_id
from gnexus_creds.main import create_app
app = create_app()
http_client = AsyncClient(
transport=ASGITransport(app=app),
base_url="http://127.0.0.1:8000",
headers={"Authorization": f"Bearer {token}"},
)
async with app.router.lifespan_context(app):
async with http_client:
async with streamable_http_client(
"http://127.0.0.1:8000/mcp-protocol/",
http_client=http_client,
terminate_on_close=False,
) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools = await session.list_tools()
assert "search_secrets" in {tool.name for tool in tools.tools}
search = await session.call_tool("search_secrets", {"q": "protocol"})
assert not search.isError
assert search.structuredContent["total"] == 1
reveal = await session.call_tool("reveal_secret", {"secret_id": str(secret.id)})
assert not reveal.isError
fields = {
field["name"]: field["value"]
for field in reveal.structuredContent["fields"]
}
assert fields["password"] == "protocol-pass"
created = await session.call_tool(
"create_secret",
{
"title": "Protocol created",
"allow_mcp": True,
"fields": [
{
"name": "token",
"value": "created-through-protocol",
"encrypted": True,
"masked": True,
}
],
},
)
assert not created.isError
assert created.structuredContent["title"] == "Protocol created"