"""Core crawling loop: catalog → detail → ingest."""
import time
from typing import Any, Dict, List, Optional
from curl_cffi.requests import Response
from src.config import (
CrawlTarget,
DELAY_BETWEEN_CATALOG_PAGES,
DELAY_BETWEEN_DETAIL_PAGES,
PAUSE_DURATION_SECONDS,
PAUSE_EVERY_N_CATALOG_PAGES,
)
from src.collector import DataCollectorClient
from src.normalizer import normalize_listing
from src.parser import parse_catalog_page, parse_detail_page
from src.session import DomRiaSession
from src.storage import ResumeStorage
class Crawler:
def __init__(
self,
session: DomRiaSession,
collector: Optional[DataCollectorClient],
storage: ResumeStorage,
):
self.session = session
self.collector = collector
self.storage = storage
self._detail_counter = 0
self._catalog_counter = 0
def crawl_target(self, target: CrawlTarget, max_pages: Optional[int] = None) -> None:
"""Scrape one city + category + operation combination."""
print(
f"\n[crawler] Starting target: {target.city_name} | "
f"{target.operation} | cat={target.category_id}"
)
page = 1
while True:
if max_pages is not None and page > max_pages:
print(f"[crawler] Reached max_pages={max_pages}")
break
url = target.catalog_url_template.format(page=page)
print(f"[crawler] Catalog page {page}: {url}")
resp = self.session.get_catalog(url)
catalog = self._handle_catalog_response(resp)
if catalog is None:
print(f"[crawler] Stopping target — catalog page {page} failed")
break
if catalog["page_404"]:
print(f"[crawler] Page 404 on catalog page {page} — target exhausted")
break
items: List[Dict[str, Any]] = catalog["items"]
if not items:
print(f"[crawler] Empty items on page {page} — target exhausted")
break
print(
f"[crawler] Page {page}: {len(items)} items, "
f"total estimated {catalog['total_count']}"
)
for item in items:
self._process_item(item, target)
# --- rate limiting between catalog pages --------------------------
self._catalog_counter += 1
if self._catalog_counter % PAUSE_EVERY_N_CATALOG_PAGES == 0:
print(
f"[crawler] Pausing {PAUSE_DURATION_SECONDS}s after "
f"{PAUSE_EVERY_N_CATALOG_PAGES} catalog pages …"
)
time.sleep(PAUSE_DURATION_SECONDS)
else:
time.sleep(DELAY_BETWEEN_CATALOG_PAGES)
page += 1
print(f"[crawler] Finished target: {target.city_name} | {target.operation} | cat={target.category_id}")
def _handle_catalog_response(self, resp: Response) -> Optional[Dict[str, Any]]:
"""Return parsed catalog dict or None on unrecoverable error."""
if resp.status_code == 404:
print(f"[crawler] Catalog 404")
return {"items": [], "total_count": 0, "page_404": True}
if resp.status_code != 200:
print(f"[crawler] Catalog HTTP {resp.status_code}")
return None
return parse_catalog_page(resp.text)
def _process_item(self, item: Dict[str, Any], target: CrawlTarget) -> None:
realty_id = str(item.get("realty_id"))
if not realty_id or realty_id == "None":
print("[crawler] Skipping item without realty_id")
return
if self.storage.is_processed(realty_id):
return # silently skip duplicates
beautiful_url = item.get("beautiful_url")
if not beautiful_url:
print(f"[crawler] Item {realty_id} missing beautiful_url, skipping detail")
detail = None
else:
print(f"[crawler] Detail {realty_id}: {beautiful_url}")
detail = self._fetch_detail(beautiful_url)
payload = normalize_listing(
catalog_item=item,
detail_realty=detail,
city_name_meta=target.city_name,
)
image_count = len(payload.get("images", []))
if image_count == 0:
print(
f"[crawler] WARNING {realty_id}: zero images in payload "
f"(catalog={len(item.get('photos') or [])}, detail={len(detail.get('photos') or []) if detail else 'N/A'})"
)
if self.collector is not None:
result = self.collector.ingest(realty_id, payload)
if result:
job_id = result.get("job_id")
self.storage.mark_processed(realty_id, job_id=job_id)
else:
# Even if ingest fails we mark processed to avoid infinite retry loops
# on truly broken listings; operator can clear the DB row manually.
self.storage.mark_processed(realty_id, job_id=None)
else:
print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})")
# --- rate limiting between detail pages -----------------------------
self._detail_counter += 1
time.sleep(DELAY_BETWEEN_DETAIL_PAGES)
def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]:
resp = self.session.get_detail(beautiful_url)
if resp.status_code != 200:
print(f"[crawler] Detail HTTP {resp.status_code} for {beautiful_url}")
return None
detail = parse_detail_page(resp.text)
if detail is None:
print(f"[crawler] Detail parse failed for {beautiful_url}")
return detail