"""HTTP client for the local data_collector service."""
import json
import time
from typing import Any, Dict, Optional
from curl_cffi import requests
from src.config import (
DATA_COLLECTOR_BASE_URL,
DATA_COLLECTOR_HEALTH_ENDPOINT,
DATA_COLLECTOR_INGEST_ENDPOINT,
DATA_COLLECTOR_SOURCE_SLUG,
)
class DataCollectorClient:
def __init__(self, base_url: str = DATA_COLLECTOR_BASE_URL, timeout: int = 30):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
def health_check(self) -> bool:
"""Return True if the collector is reachable."""
try:
resp = self.session.get(
f"{self.base_url}{DATA_COLLECTOR_HEALTH_ENDPOINT}",
timeout=self.timeout,
)
return resp.status_code == 200
except Exception as exc:
print(f"[collector] Health check failed: {exc}")
return False
def ingest(
self,
external_id: str,
payload: Dict[str, Any],
source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
) -> Optional[Dict[str, Any]]:
"""Send one listing to /api/v1/ingest. Returns the JSON response or None on error."""
body = {
"source_slug": source_slug,
"external_id": external_id,
"payload": payload,
}
url = f"{self.base_url}{DATA_COLLECTOR_INGEST_ENDPOINT}"
try:
resp = self.session.post(
url,
json=body,
headers={"Content-Type": "application/json"},
timeout=self.timeout,
)
if resp.status_code == 202:
data = resp.json()
print(
f"[collector] Ingested {external_id}: status={data.get('status')} "
f"job_id={data.get('job_id')} snapshot_id={data.get('snapshot_id')}"
)
return data
else:
print(
f"[collector] Ingest failed for {external_id}: "
f"HTTP {resp.status_code} body={resp.text[:200]}"
)
return None
except Exception as exc:
print(f"[collector] Ingest exception for {external_id}: {exc}")
return None