Newer
Older
vmk-360-domria_parser / src / collector.py
"""HTTP client for the local data_collector service."""
import json
import time
from typing import Any, Dict, Optional

from curl_cffi import requests

from src.config import (
    DATA_COLLECTOR_BASE_URL,
    DATA_COLLECTOR_HEALTH_ENDPOINT,
    DATA_COLLECTOR_INGEST_ENDPOINT,
    DATA_COLLECTOR_SOURCE_SLUG,
)


class DataCollectorClient:
    def __init__(self, base_url: str = DATA_COLLECTOR_BASE_URL, timeout: int = 30):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.session = requests.Session()

    def health_check(self) -> bool:
        """Return True if the collector is reachable."""
        try:
            resp = self.session.get(
                f"{self.base_url}{DATA_COLLECTOR_HEALTH_ENDPOINT}",
                timeout=self.timeout,
            )
            return resp.status_code == 200
        except Exception as exc:
            print(f"[collector] Health check failed: {exc}")
            return False

    def ingest(
        self,
        external_id: str,
        payload: Dict[str, Any],
        source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
    ) -> Optional[Dict[str, Any]]:
        """Send one listing to /api/v1/ingest. Returns the JSON response or None on error."""
        body = {
            "source_slug": source_slug,
            "external_id": external_id,
            "payload": payload,
        }
        url = f"{self.base_url}{DATA_COLLECTOR_INGEST_ENDPOINT}"
        try:
            resp = self.session.post(
                url,
                json=body,
                headers={"Content-Type": "application/json"},
                timeout=self.timeout,
            )
            if resp.status_code == 202:
                data = resp.json()
                print(
                    f"[collector] Ingested {external_id}: status={data.get('status')} "
                    f"job_id={data.get('job_id')} snapshot_id={data.get('snapshot_id')}"
                )
                return data
            else:
                print(
                    f"[collector] Ingest failed for {external_id}: "
                    f"HTTP {resp.status_code} body={resp.text[:200]}"
                )
                return None
        except Exception as exc:
            print(f"[collector] Ingest exception for {external_id}: {exc}")
            return None