Newer
Older
vmk-360-domria_parser / src / collector.py
"""HTTP client for the local data_collector service."""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional

from curl_cffi import CurlMime, requests

from src.config import (
    DATA_COLLECTOR_BASE_URL,
    DATA_COLLECTOR_HEALTH_ENDPOINT,
    DATA_COLLECTOR_INGEST_ENDPOINT,
    DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT,
    DATA_COLLECTOR_SOURCE_SLUG,
)


class DataCollectorClient:
    def __init__(self, base_url: str = DATA_COLLECTOR_BASE_URL, timeout: int = 30):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self.session = requests.Session()

    def health_check(self) -> bool:
        """Return True if the collector is reachable."""
        try:
            resp = self.session.get(
                f"{self.base_url}{DATA_COLLECTOR_HEALTH_ENDPOINT}",
                timeout=self.timeout,
            )
            return resp.status_code == 200
        except Exception as exc:
            print(f"[collector] Health check failed: {exc}")
            return False

    def ingest(
        self,
        external_id: str,
        payload: Dict[str, Any],
        source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
    ) -> Optional[Dict[str, Any]]:
        """Send one listing to /api/v1/ingest. Returns the JSON response or None on error."""
        body = {
            "source_slug": source_slug,
            "external_id": external_id,
            "payload": payload,
        }
        url = f"{self.base_url}{DATA_COLLECTOR_INGEST_ENDPOINT}"
        try:
            resp = self.session.post(
                url,
                json=body,
                headers={"Content-Type": "application/json"},
                timeout=self.timeout,
            )
            if resp.status_code == 202:
                data = resp.json()
                print(
                    f"[collector] Ingested {external_id}: status={data.get('status')} "
                    f"job_id={data.get('job_id')} snapshot_id={data.get('snapshot_id')}"
                )
                return data
            else:
                print(
                    f"[collector] Ingest failed for {external_id}: "
                    f"HTTP {resp.status_code} body={resp.text[:200]}"
                )
                return None
        except Exception as exc:
            print(f"[collector] Ingest exception for {external_id}: {exc}")
            return None

    def ingest_with_images(
        self,
        external_id: str,
        payload: Dict[str, Any],
        image_paths: List[str],
        source_slug: str = DATA_COLLECTOR_SOURCE_SLUG,
    ) -> Optional[Dict[str, Any]]:
        """Send listing + binary images via multipart to /api/v1/ingest/with-images.

        Pipeline runs inline, so we increase timeout to 120s.
        """
        metadata = json.dumps(
            {
                "source_slug": source_slug,
                "external_id": external_id,
                "payload": payload,
            },
            ensure_ascii=False,
        )
        url = f"{self.base_url}{DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT}"
        mime: Optional[CurlMime] = None
        try:
            mime = CurlMime()
            for path in image_paths:
                p = Path(path)
                if not p.exists():
                    continue
                mime.addpart(
                    name="images",
                    local_path=str(p),
                    filename=p.name,
                    content_type="application/octet-stream",
                )

            resp = self.session.post(
                url,
                data={"metadata": metadata},
                multipart=mime,
                timeout=300,
            )
            if resp.status_code == 202:
                data = resp.json()
                print(
                    f"[collector] Ingested with-images {external_id}: "
                    f"status={data.get('status')} job_id={data.get('job_id')} "
                    f"property_id={data.get('property_id')}"
                )
                return data
            else:
                print(
                    f"[collector] Ingest with-images failed for {external_id}: "
                    f"HTTP {resp.status_code} body={resp.text[:200]}"
                )
                return None
        except Exception as exc:
            print(f"[collector] Ingest with-images exception for {external_id}: {exc}")
            return None
        finally:
            if mime is not None:
                try:
                    mime.close()
                except Exception:
                    pass