"""Tests for webhook components."""
import hashlib
import hmac
import json
from datetime import datetime, timezone
import pytest
from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.exceptions import WebhookPayloadException, WebhookVerificationException
from gnexus_gauth.support import SystemClock
from gnexus_gauth.webhook import HmacWebhookVerifier, JsonWebhookParser
class TestHmacWebhookVerifier:
def test_verify_success(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
webhook_tolerance_seconds=300,
)
verifier = HmacWebhookVerifier(config)
raw_body = '{"type": "user.updated", "id": "evt_1"}'
timestamp = int(datetime.now(timezone.utc).timestamp())
signature = hmac.new(
b"webhook_secret",
f"{timestamp}.{raw_body}".encode(),
hashlib.sha256,
).hexdigest()
headers = {
"X-Gnexus-Event-Id": "evt_1",
"X-Gnexus-Event-Type": "user.updated",
"X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
"X-Gnexus-Signature": f"t={timestamp},v1={signature}",
}
result = verifier.verify(raw_body, headers, "webhook_secret")
assert result.raw_body == raw_body
assert result.signature_id == "evt_1"
assert result.verified_at is not None
def test_missing_header(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
)
verifier = HmacWebhookVerifier(config)
with pytest.raises(WebhookVerificationException, match="Missing webhook header"):
verifier.verify("body", {}, "secret")
def test_invalid_signature(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
)
verifier = HmacWebhookVerifier(config)
raw_body = '{"type": "user.updated"}'
timestamp = int(datetime.now(timezone.utc).timestamp())
headers = {
"X-Gnexus-Event-Id": "evt_1",
"X-Gnexus-Event-Type": "user.updated",
"X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
"X-Gnexus-Signature": f"t={timestamp},v1=bad_signature",
}
with pytest.raises(WebhookVerificationException, match="Invalid webhook signature"):
verifier.verify(raw_body, headers, "secret")
def test_timestamp_outside_tolerance(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
webhook_tolerance_seconds=60,
)
verifier = HmacWebhookVerifier(config)
raw_body = '{"type": "user.updated"}'
old_timestamp = int(datetime.now(timezone.utc).timestamp()) - 120
signature = hmac.new(
b"secret",
f"{old_timestamp}.{raw_body}".encode(),
hashlib.sha256,
).hexdigest()
headers = {
"X-Gnexus-Event-Id": "evt_1",
"X-Gnexus-Event-Type": "user.updated",
"X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
"X-Gnexus-Signature": f"t={old_timestamp},v1={signature}",
}
with pytest.raises(WebhookVerificationException, match="tolerance window"):
verifier.verify(raw_body, headers, "secret")
def test_malformed_signature_header(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
)
verifier = HmacWebhookVerifier(config)
headers = {
"X-Gnexus-Event-Id": "evt_1",
"X-Gnexus-Event-Type": "user.updated",
"X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
"X-Gnexus-Signature": "bad_header",
}
with pytest.raises(WebhookVerificationException, match="Malformed"):
verifier.verify("body", headers, "secret")
def test_non_numeric_timestamp(self) -> None:
config = GAuthConfig(
base_url="https://auth.example.test",
client_id="billing",
client_secret="secret",
redirect_uri="https://billing.example.test/callback",
)
verifier = HmacWebhookVerifier(config)
headers = {
"X-Gnexus-Event-Id": "evt_1",
"X-Gnexus-Event-Type": "user.updated",
"X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
"X-Gnexus-Signature": "t=abc,v1=hash",
}
with pytest.raises(WebhookVerificationException, match="numeric"):
verifier.verify("body", headers, "secret")
class TestJsonWebhookParser:
def test_parse_success(self) -> None:
parser = JsonWebhookParser()
raw = json.dumps({
"id": "evt_1",
"type": "user.updated",
"occurred_at": "2024-01-15T10:30:00+00:00",
"target": {"user_id": "user_123"},
"actor": {"user_id": "admin_1"},
"data": {"field": "email"},
})
event = parser.parse(raw)
assert event.event_id == "evt_1"
assert event.event_type == "user.updated"
assert event.occurred_at is not None
assert event.occurred_at.year == 2024
assert event.target_identifiers == {"user_id": "user_123"}
assert event.actor_identifiers == {"user_id": "admin_1"}
assert event.metadata == {"field": "email"}
def test_parse_invalid_json(self) -> None:
parser = JsonWebhookParser()
with pytest.raises(WebhookPayloadException, match="not valid JSON"):
parser.parse("not json")
def test_parse_missing_type(self) -> None:
parser = JsonWebhookParser()
with pytest.raises(WebhookPayloadException, match="missing event type"):
parser.parse('{"id": "evt_1"}')
def test_parse_invalid_occurred_at(self) -> None:
parser = JsonWebhookParser()
with pytest.raises(WebhookPayloadException, match="invalid occurred_at"):
parser.parse('{"type": "test", "occurred_at": "not-a-date"}')