Newer
Older
vmk-360-domria_parser / src / crawler.py
"""Core crawling loop: catalog → detail → ingest."""
import time
from typing import Any, Dict, List, Optional

from curl_cffi.requests import Response

from src.config import (
    CrawlTarget,
    DELAY_BETWEEN_CATALOG_PAGES,
    DELAY_BETWEEN_DETAIL_PAGES,
    PAUSE_DURATION_SECONDS,
    PAUSE_EVERY_N_CATALOG_PAGES,
)
from src.collector import DataCollectorClient
from src.normalizer import normalize_listing
from src.parser import parse_catalog_page, parse_detail_page
from src.session import DomRiaSession
from src.storage import ResumeStorage


class Crawler:
    def __init__(
        self,
        session: DomRiaSession,
        collector: Optional[DataCollectorClient],
        storage: ResumeStorage,
    ):
        self.session = session
        self.collector = collector
        self.storage = storage
        self._detail_counter = 0
        self._catalog_counter = 0

    def crawl_target(self, target: CrawlTarget, max_pages: Optional[int] = None) -> None:
        """Scrape one city + category + operation combination."""
        print(
            f"\n[crawler] Starting target: {target.city_name} | "
            f"{target.operation} | cat={target.category_id}"
        )
        page = 1
        while True:
            if max_pages is not None and page > max_pages:
                print(f"[crawler] Reached max_pages={max_pages}")
                break

            url = target.catalog_url_template.format(page=page)
            print(f"[crawler] Catalog page {page}: {url}")

            resp = self.session.get_catalog(url)
            catalog = self._handle_catalog_response(resp)

            if catalog is None:
                print(f"[crawler] Stopping target — catalog page {page} failed")
                break

            if catalog["page_404"]:
                print(f"[crawler] Page 404 on catalog page {page} — target exhausted")
                break

            items: List[Dict[str, Any]] = catalog["items"]
            if not items:
                print(f"[crawler] Empty items on page {page} — target exhausted")
                break

            print(
                f"[crawler] Page {page}: {len(items)} items, "
                f"total estimated {catalog['total_count']}"
            )

            for item in items:
                self._process_item(item, target)

            # --- rate limiting between catalog pages --------------------------
            self._catalog_counter += 1
            if self._catalog_counter % PAUSE_EVERY_N_CATALOG_PAGES == 0:
                print(
                    f"[crawler] Pausing {PAUSE_DURATION_SECONDS}s after "
                    f"{PAUSE_EVERY_N_CATALOG_PAGES} catalog pages …"
                )
                time.sleep(PAUSE_DURATION_SECONDS)
            else:
                time.sleep(DELAY_BETWEEN_CATALOG_PAGES)

            page += 1

        print(f"[crawler] Finished target: {target.city_name} | {target.operation} | cat={target.category_id}")

    def _handle_catalog_response(self, resp: Response) -> Optional[Dict[str, Any]]:
        """Return parsed catalog dict or None on unrecoverable error."""
        if resp.status_code == 404:
            print(f"[crawler] Catalog 404")
            return {"items": [], "total_count": 0, "page_404": True}
        if resp.status_code != 200:
            print(f"[crawler] Catalog HTTP {resp.status_code}")
            return None
        return parse_catalog_page(resp.text)

    def _process_item(self, item: Dict[str, Any], target: CrawlTarget) -> None:
        realty_id = str(item.get("realty_id"))
        if not realty_id or realty_id == "None":
            print("[crawler] Skipping item without realty_id")
            return

        if self.storage.is_processed(realty_id):
            return  # silently skip duplicates

        beautiful_url = item.get("beautiful_url")
        if not beautiful_url:
            print(f"[crawler] Item {realty_id} missing beautiful_url, skipping detail")
            detail = None
        else:
            print(f"[crawler] Detail {realty_id}: {beautiful_url}")
            detail = self._fetch_detail(beautiful_url)

        payload = normalize_listing(
            catalog_item=item,
            detail_realty=detail,
            city_name_meta=target.city_name,
        )

        photo_count = payload.get("photos_count", 0)
        if photo_count == 0:
            print(
                f"[crawler] WARNING {realty_id}: zero photos in payload "
                f"(catalog={len(item.get('photos') or [])}, detail={len(detail.get('photos') or []) if detail else 'N/A'})"
            )

        if self.collector is not None:
            result = self.collector.ingest(realty_id, payload)
            if result:
                job_id = result.get("job_id")
                self.storage.mark_processed(realty_id, job_id=job_id)
            else:
                # Even if ingest fails we mark processed to avoid infinite retry loops
                # on truly broken listings; operator can clear the DB row manually.
                self.storage.mark_processed(realty_id, job_id=None)
        else:
            print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})")

        # --- rate limiting between detail pages -----------------------------
        self._detail_counter += 1
        time.sleep(DELAY_BETWEEN_DETAIL_PAGES)

    def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]:
        resp = self.session.get_detail(beautiful_url)
        if resp.status_code != 200:
            print(f"[crawler] Detail HTTP {resp.status_code} for {beautiful_url}")
            return None
        detail = parse_detail_page(resp.text)
        if detail is None:
            print(f"[crawler] Detail parse failed for {beautiful_url}")
        return detail