"""HTTP client for the local data_collector service."""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from curl_cffi import CurlMime, requests
from src.config import (
DATA_COLLECTOR_BASE_URL,
DATA_COLLECTOR_HEALTH_ENDPOINT,
DATA_COLLECTOR_INGEST_ENDPOINT,
DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT,
DATA_COLLECTOR_SOURCE_SLUG,
)
class DataCollectorClient:
def __init__(self, base_url: str = DATA_COLLECTOR_BASE_URL, timeout: int = 30):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
def health_check(self) -> bool:
"""Return True if the collector is reachable."""
try:
resp = self.session.get(
f"{self.base_url}{DATA_COLLECTOR_HEALTH_ENDPOINT}",
timeout=self.timeout,
)
return resp.status_code == 200
except Exception as exc:
print(f"[collector] Health check failed: {exc}")
return False
def ingest(
self,
external_id: str,
payload: Dict[str, Any],
source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
) -> Optional[Dict[str, Any]]:
"""Send one listing to /api/v1/ingest. Returns the JSON response or None on error."""
body = {
"source_slug": source_slug,
"external_id": external_id,
"payload": payload,
}
url = f"{self.base_url}{DATA_COLLECTOR_INGEST_ENDPOINT}"
try:
resp = self.session.post(
url,
json=body,
headers={"Content-Type": "application/json"},
timeout=self.timeout,
)
if resp.status_code == 202:
data = resp.json()
print(
f"[collector] Ingested {external_id}: status={data.get('status')} "
f"job_id={data.get('job_id')} snapshot_id={data.get('snapshot_id')}"
)
return data
else:
print(
f"[collector] Ingest failed for {external_id}: "
f"HTTP {resp.status_code} body={resp.text[:200]}"
)
return None
except Exception as exc:
print(f"[collector] Ingest exception for {external_id}: {exc}")
return None
def ingest_with_images(
self,
external_id: str,
payload: Dict[str, Any],
image_paths: List[str],
source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
) -> Optional[Dict[str, Any]]:
"""Send listing + binary images via multipart to /api/v1/ingest/with-images.
Pipeline runs inline, so we increase timeout to 120s.
"""
metadata = json.dumps(
{
"source_slug": source_slug,
"external_id": external_id,
"payload": payload,
},
ensure_ascii=False,
)
url = f"{self.base_url}{DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT}"
mime: Optional[CurlMime] = None
try:
mime = CurlMime()
for path in image_paths:
p = Path(path)
if not p.exists():
continue
mime.addpart(
name="images",
local_path=str(p),
filename=p.name,
content_type="application/octet-stream",
)
resp = self.session.post(
url,
data={"metadata": metadata},
multipart=mime,
timeout=300,
)
if resp.status_code == 202:
data = resp.json()
print(
f"[collector] Ingested with-images {external_id}: "
f"status={data.get('status')} job_id={data.get('job_id')} "
f"property_id={data.get('property_id')}"
)
return data
else:
print(
f"[collector] Ingest with-images failed for {external_id}: "
f"HTTP {resp.status_code} body={resp.text[:200]}"
)
return None
except Exception as exc:
print(f"[collector] Ingest with-images exception for {external_id}: {exc}")
return None
finally:
if mime is not None:
try:
mime.close()
except Exception:
pass