Newer
Older
gnexus-auth-client-py / tests / unit / test_webhook.py
"""Tests for webhook components."""

import hashlib
import hmac
import json
from datetime import datetime, timezone

import pytest

from gnexus_gauth.config import GAuthConfig
from gnexus_gauth.exceptions import WebhookPayloadException, WebhookVerificationException
from gnexus_gauth.support import SystemClock
from gnexus_gauth.webhook import HmacWebhookVerifier, JsonWebhookParser


class TestHmacWebhookVerifier:
    def test_verify_success(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
            webhook_tolerance_seconds=300,
        )
        verifier = HmacWebhookVerifier(config)

        raw_body = '{"type": "user.updated", "id": "evt_1"}'
        timestamp = int(datetime.now(timezone.utc).timestamp())
        signature = hmac.new(
            b"webhook_secret",
            f"{timestamp}.{raw_body}".encode(),
            hashlib.sha256,
        ).hexdigest()
        headers = {
            "X-Gnexus-Event-Id": "evt_1",
            "X-Gnexus-Event-Type": "user.updated",
            "X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
            "X-Gnexus-Signature": f"t={timestamp},v1={signature}",
        }

        result = verifier.verify(raw_body, headers, "webhook_secret")
        assert result.raw_body == raw_body
        assert result.signature_id == "evt_1"
        assert result.verified_at is not None

    def test_missing_header(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        verifier = HmacWebhookVerifier(config)

        with pytest.raises(WebhookVerificationException, match="Missing webhook header"):
            verifier.verify("body", {}, "secret")

    def test_invalid_signature(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        verifier = HmacWebhookVerifier(config)

        raw_body = '{"type": "user.updated"}'
        timestamp = int(datetime.now(timezone.utc).timestamp())
        headers = {
            "X-Gnexus-Event-Id": "evt_1",
            "X-Gnexus-Event-Type": "user.updated",
            "X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
            "X-Gnexus-Signature": f"t={timestamp},v1=bad_signature",
        }

        with pytest.raises(WebhookVerificationException, match="Invalid webhook signature"):
            verifier.verify(raw_body, headers, "secret")

    def test_timestamp_outside_tolerance(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
            webhook_tolerance_seconds=60,
        )
        verifier = HmacWebhookVerifier(config)

        raw_body = '{"type": "user.updated"}'
        old_timestamp = int(datetime.now(timezone.utc).timestamp()) - 120
        signature = hmac.new(
            b"secret",
            f"{old_timestamp}.{raw_body}".encode(),
            hashlib.sha256,
        ).hexdigest()
        headers = {
            "X-Gnexus-Event-Id": "evt_1",
            "X-Gnexus-Event-Type": "user.updated",
            "X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
            "X-Gnexus-Signature": f"t={old_timestamp},v1={signature}",
        }

        with pytest.raises(WebhookVerificationException, match="tolerance window"):
            verifier.verify(raw_body, headers, "secret")

    def test_malformed_signature_header(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        verifier = HmacWebhookVerifier(config)

        headers = {
            "X-Gnexus-Event-Id": "evt_1",
            "X-Gnexus-Event-Type": "user.updated",
            "X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
            "X-Gnexus-Signature": "bad_header",
        }

        with pytest.raises(WebhookVerificationException, match="Malformed"):
            verifier.verify("body", headers, "secret")

    def test_non_numeric_timestamp(self) -> None:
        config = GAuthConfig(
            base_url="https://auth.example.test",
            client_id="billing",
            client_secret="secret",
            redirect_uri="https://billing.example.test/callback",
        )
        verifier = HmacWebhookVerifier(config)

        headers = {
            "X-Gnexus-Event-Id": "evt_1",
            "X-Gnexus-Event-Type": "user.updated",
            "X-Gnexus-Event-Timestamp": "2024-01-01T00:00:00Z",
            "X-Gnexus-Signature": "t=abc,v1=hash",
        }

        with pytest.raises(WebhookVerificationException, match="numeric"):
            verifier.verify("body", headers, "secret")


class TestJsonWebhookParser:
    def test_parse_success(self) -> None:
        parser = JsonWebhookParser()
        raw = json.dumps({
            "id": "evt_1",
            "type": "user.updated",
            "occurred_at": "2024-01-15T10:30:00+00:00",
            "target": {"user_id": "user_123"},
            "actor": {"user_id": "admin_1"},
            "data": {"field": "email"},
        })
        event = parser.parse(raw)
        assert event.event_id == "evt_1"
        assert event.event_type == "user.updated"
        assert event.occurred_at is not None
        assert event.occurred_at.year == 2024
        assert event.target_identifiers == {"user_id": "user_123"}
        assert event.actor_identifiers == {"user_id": "admin_1"}
        assert event.metadata == {"field": "email"}

    def test_parse_invalid_json(self) -> None:
        parser = JsonWebhookParser()
        with pytest.raises(WebhookPayloadException, match="not valid JSON"):
            parser.parse("not json")

    def test_parse_missing_type(self) -> None:
        parser = JsonWebhookParser()
        with pytest.raises(WebhookPayloadException, match="missing event type"):
            parser.parse('{"id": "evt_1"}')

    def test_parse_invalid_occurred_at(self) -> None:
        parser = JsonWebhookParser()
        with pytest.raises(WebhookPayloadException, match="invalid occurred_at"):
            parser.parse('{"type": "test", "occurred_at": "not-a-date"}')