Newer
Older
vmk-360-domria_parser / src / crawler.py
"""Core crawling loop: catalog → detail → ingest."""
import os
import shutil
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple

import curl_cffi.requests as _curl_requests
from curl_cffi.requests import Response

from src.config import (
    CrawlTarget,
    DELAY_BETWEEN_CATALOG_PAGES,
    DELAY_BETWEEN_DETAIL_PAGES,
    PAUSE_DURATION_SECONDS,
    PAUSE_EVERY_N_CATALOG_PAGES,
)
from src.collector import DataCollectorClient
from src.normalizer import normalize_listing
from src.parser import parse_catalog_page, parse_detail_page
from src.session import DomRiaSession
from src.storage import ResumeStorage


class Crawler:
    def __init__(
        self,
        session: DomRiaSession,
        collector: Optional[DataCollectorClient],
        storage: ResumeStorage,
    ):
        self.session = session
        self.collector = collector
        self.storage = storage
        self._detail_counter = 0
        self._catalog_counter = 0

    def crawl_target(self, target: CrawlTarget, max_pages: Optional[int] = None) -> None:
        """Scrape one city + category + operation combination."""
        print(
            f"\n[crawler] Starting target: {target.city_name} | "
            f"{target.operation} | cat={target.category_id}"
        )
        page = 1
        while True:
            if max_pages is not None and page > max_pages:
                print(f"[crawler] Reached max_pages={max_pages}")
                break

            url = target.catalog_url_template.format(page=page)
            print(f"[crawler] Catalog page {page}: {url}")

            resp = self.session.get_catalog(url)
            catalog = self._handle_catalog_response(resp)

            if catalog is None:
                print(f"[crawler] Stopping target — catalog page {page} failed")
                break

            if catalog["page_404"]:
                print(f"[crawler] Page 404 on catalog page {page} — target exhausted")
                break

            items: List[Dict[str, Any]] = catalog["items"]
            if not items:
                print(f"[crawler] Empty items on page {page} — target exhausted")
                break

            print(
                f"[crawler] Page {page}: {len(items)} items, "
                f"total estimated {catalog['total_count']}"
            )

            for item in items:
                self._process_item(item, target)

            # --- rate limiting between catalog pages --------------------------
            self._catalog_counter += 1
            if self._catalog_counter % PAUSE_EVERY_N_CATALOG_PAGES == 0:
                print(
                    f"[crawler] Pausing {PAUSE_DURATION_SECONDS}s after "
                    f"{PAUSE_EVERY_N_CATALOG_PAGES} catalog pages …"
                )
                time.sleep(PAUSE_DURATION_SECONDS)
            else:
                time.sleep(DELAY_BETWEEN_CATALOG_PAGES)

            page += 1

        print(f"[crawler] Finished target: {target.city_name} | {target.operation} | cat={target.category_id}")

    def _handle_catalog_response(self, resp: Response) -> Optional[Dict[str, Any]]:
        """Return parsed catalog dict or None on unrecoverable error."""
        if resp.status_code == 404:
            print(f"[crawler] Catalog 404")
            return {"items": [], "total_count": 0, "page_404": True}
        if resp.status_code != 200:
            print(f"[crawler] Catalog HTTP {resp.status_code}")
            return None
        return parse_catalog_page(resp.text)

    def _process_item(self, item: Dict[str, Any], target: CrawlTarget) -> None:
        realty_id = str(item.get("realty_id"))
        if not realty_id or realty_id == "None":
            print("[crawler] Skipping item without realty_id")
            return

        if self.storage.is_processed(realty_id):
            return  # silently skip duplicates

        beautiful_url = item.get("beautiful_url")
        if not beautiful_url:
            print(f"[crawler] Item {realty_id} missing beautiful_url, skipping detail")
            detail = None
        else:
            print(f"[crawler] Detail {realty_id}: {beautiful_url}")
            detail = self._fetch_detail(beautiful_url)

        payload = normalize_listing(
            catalog_item=item,
            detail_realty=detail,
            city_name_meta=target.city_name,
            category_id=target.category_id,
        )

        image_urls = payload.get("images", [])
        image_count = len(image_urls)
        if image_count == 0:
            print(
                f"[crawler] WARNING {realty_id}: zero images in payload "
                f"(catalog={len(item.get('photos') or [])}, detail={len(detail.get('photos') or []) if detail else 'N/A'})"
            )

        image_paths: List[str] = []
        temp_dir = ""
        # cap images to avoid inline pipeline timeouts (AI analysis per photo)
        MAX_IMAGES_PER_LISTING = 15
        capped_urls = image_urls[:MAX_IMAGES_PER_LISTING] if len(image_urls) > MAX_IMAGES_PER_LISTING else image_urls
        if len(image_urls) > MAX_IMAGES_PER_LISTING:
            print(f"[crawler] {realty_id}: capping images {len(image_urls)} -> {MAX_IMAGES_PER_LISTING}")

        if self.collector is not None and capped_urls:
            image_paths, temp_dir = self._download_images(capped_urls)

        if self.collector is not None:
            if image_paths:
                result = self.collector.ingest_with_images(
                    realty_id, payload, image_paths
                )
            else:
                result = self.collector.ingest(realty_id, payload)

            if result:
                job_id = result.get("job_id")
                self.storage.mark_processed(realty_id, job_id=job_id)
            else:
                # Even if ingest fails we mark processed to avoid infinite retry loops
                # on truly broken listings; operator can clear the DB row manually.
                self.storage.mark_processed(realty_id, job_id=None)
        else:
            print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})")

        # --- cleanup temp images --------------------------------------------
        if temp_dir:
            shutil.rmtree(temp_dir, ignore_errors=True)

        # --- rate limiting between detail pages -----------------------------
        self._detail_counter += 1
        time.sleep(DELAY_BETWEEN_DETAIL_PAGES)

    def _download_images(self, urls: List[str]) -> Tuple[List[str], str]:
        """Download image URLs to a temp directory.

        Returns (list of file paths, temp directory path).
        """
        if not urls:
            return [], ""
        temp_dir = tempfile.mkdtemp(prefix="domria_img_")
        paths: List[str] = []

        def _one(url: str, idx: int) -> Optional[str]:
            try:
                resp = _curl_requests.get(url, timeout=30, impersonate="chrome124")
                if resp.status_code != 200:
                    return None
                # infer extension from url, default .jpg
                suffix = ".jpg"
                if "." in url.split("/")[-1]:
                    ext = url.split("/")[-1].split(".")[-1].split("?")[0]
                    if ext and len(ext) <= 5:
                        suffix = f".{ext}"
                dest = os.path.join(temp_dir, f"{idx}{suffix}")
                with open(dest, "wb") as f:
                    f.write(resp.content)
                return dest
            except Exception as exc:
                print(f"[crawler] Image download failed {url}: {exc}")
                return None

        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(_one, url, i) for i, url in enumerate(urls)]
            for future in futures:
                path = future.result()
                if path:
                    paths.append(path)

        print(f"[crawler] Downloaded {len(paths)}/{len(urls)} images to {temp_dir}")
        return paths, temp_dir

    def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]:
        resp = self.session.get_detail(beautiful_url)
        if resp.status_code != 200:
            print(f"[crawler] Detail HTTP {resp.status_code} for {beautiful_url}")
            return None
        detail = parse_detail_page(resp.text)
        if detail is None:
            print(f"[crawler] Detail parse failed for {beautiful_url}")
        return detail