"""Core crawling loop: catalog → detail → ingest."""
import os
import shutil
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple
import curl_cffi.requests as _curl_requests
from curl_cffi.requests import Response
from src.config import (
CrawlTarget,
DELAY_BETWEEN_CATALOG_PAGES,
DELAY_BETWEEN_DETAIL_PAGES,
PAUSE_DURATION_SECONDS,
PAUSE_EVERY_N_CATALOG_PAGES,
)
from src.collector import DataCollectorClient
from src.normalizer import normalize_listing
from src.parser import parse_catalog_page, parse_detail_page
from src.session import DomRiaSession
from src.storage import ResumeStorage
class Crawler:
def __init__(
self,
session: DomRiaSession,
collector: Optional[DataCollectorClient],
storage: ResumeStorage,
):
self.session = session
self.collector = collector
self.storage = storage
self._detail_counter = 0
self._catalog_counter = 0
def crawl_target(self, target: CrawlTarget, max_pages: Optional[int] = None) -> None:
"""Scrape one city + category + operation combination."""
print(
f"\n[crawler] Starting target: {target.city_name} | "
f"{target.operation} | cat={target.category_id}"
)
page = 1
while True:
if max_pages is not None and page > max_pages:
print(f"[crawler] Reached max_pages={max_pages}")
break
url = target.catalog_url_template.format(page=page)
print(f"[crawler] Catalog page {page}: {url}")
resp = self.session.get_catalog(url)
catalog = self._handle_catalog_response(resp)
if catalog is None:
print(f"[crawler] Stopping target — catalog page {page} failed")
break
if catalog["page_404"]:
print(f"[crawler] Page 404 on catalog page {page} — target exhausted")
break
items: List[Dict[str, Any]] = catalog["items"]
if not items:
print(f"[crawler] Empty items on page {page} — target exhausted")
break
print(
f"[crawler] Page {page}: {len(items)} items, "
f"total estimated {catalog['total_count']}"
)
for item in items:
self._process_item(item, target)
# --- rate limiting between catalog pages --------------------------
self._catalog_counter += 1
if self._catalog_counter % PAUSE_EVERY_N_CATALOG_PAGES == 0:
print(
f"[crawler] Pausing {PAUSE_DURATION_SECONDS}s after "
f"{PAUSE_EVERY_N_CATALOG_PAGES} catalog pages …"
)
time.sleep(PAUSE_DURATION_SECONDS)
else:
time.sleep(DELAY_BETWEEN_CATALOG_PAGES)
page += 1
print(f"[crawler] Finished target: {target.city_name} | {target.operation} | cat={target.category_id}")
def _handle_catalog_response(self, resp: Response) -> Optional[Dict[str, Any]]:
"""Return parsed catalog dict or None on unrecoverable error."""
if resp.status_code == 404:
print(f"[crawler] Catalog 404")
return {"items": [], "total_count": 0, "page_404": True}
if resp.status_code != 200:
print(f"[crawler] Catalog HTTP {resp.status_code}")
return None
return parse_catalog_page(resp.text)
def _process_item(self, item: Dict[str, Any], target: CrawlTarget) -> None:
realty_id = str(item.get("realty_id"))
if not realty_id or realty_id == "None":
print("[crawler] Skipping item without realty_id")
return
if self.storage.is_processed(realty_id):
return # silently skip duplicates
beautiful_url = item.get("beautiful_url")
if not beautiful_url:
print(f"[crawler] Item {realty_id} missing beautiful_url, skipping detail")
detail = None
else:
print(f"[crawler] Detail {realty_id}: {beautiful_url}")
detail = self._fetch_detail(beautiful_url)
payload = normalize_listing(
catalog_item=item,
detail_realty=detail,
city_name_meta=target.city_name,
category_id=target.category_id,
)
image_urls = payload.get("images", [])
image_count = len(image_urls)
if image_count == 0:
print(
f"[crawler] WARNING {realty_id}: zero images in payload "
f"(catalog={len(item.get('photos') or [])}, detail={len(detail.get('photos') or []) if detail else 'N/A'})"
)
image_paths: List[str] = []
temp_dir = ""
# cap images to avoid inline pipeline timeouts (AI analysis per photo)
MAX_IMAGES_PER_LISTING = 15
capped_urls = image_urls[:MAX_IMAGES_PER_LISTING] if len(image_urls) > MAX_IMAGES_PER_LISTING else image_urls
if len(image_urls) > MAX_IMAGES_PER_LISTING:
print(f"[crawler] {realty_id}: capping images {len(image_urls)} -> {MAX_IMAGES_PER_LISTING}")
if self.collector is not None and capped_urls:
image_paths, temp_dir = self._download_images(capped_urls)
if self.collector is not None:
if image_paths:
result = self.collector.ingest_with_images(
realty_id, payload, image_paths
)
else:
result = self.collector.ingest(realty_id, payload)
if result:
job_id = result.get("job_id")
self.storage.mark_processed(realty_id, job_id=job_id)
else:
# Even if ingest fails we mark processed to avoid infinite retry loops
# on truly broken listings; operator can clear the DB row manually.
self.storage.mark_processed(realty_id, job_id=None)
else:
print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})")
# --- cleanup temp images --------------------------------------------
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
# --- rate limiting between detail pages -----------------------------
self._detail_counter += 1
time.sleep(DELAY_BETWEEN_DETAIL_PAGES)
def _download_images(self, urls: List[str]) -> Tuple[List[str], str]:
"""Download image URLs to a temp directory.
Returns (list of file paths, temp directory path).
"""
if not urls:
return [], ""
temp_dir = tempfile.mkdtemp(prefix="domria_img_")
paths: List[str] = []
def _one(url: str, idx: int) -> Optional[str]:
try:
resp = _curl_requests.get(url, timeout=30, impersonate="chrome124")
if resp.status_code != 200:
return None
# infer extension from url, default .jpg
suffix = ".jpg"
if "." in url.split("/")[-1]:
ext = url.split("/")[-1].split(".")[-1].split("?")[0]
if ext and len(ext) <= 5:
suffix = f".{ext}"
dest = os.path.join(temp_dir, f"{idx}{suffix}")
with open(dest, "wb") as f:
f.write(resp.content)
return dest
except Exception as exc:
print(f"[crawler] Image download failed {url}: {exc}")
return None
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(_one, url, i) for i, url in enumerate(urls)]
for future in futures:
path = future.result()
if path:
paths.append(path)
print(f"[crawler] Downloaded {len(paths)}/{len(urls)} images to {temp_dir}")
return paths, temp_dir
def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]:
resp = self.session.get_detail(beautiful_url)
if resp.status_code != 200:
print(f"[crawler] Detail HTTP {resp.status_code} for {beautiful_url}")
return None
detail = parse_detail_page(resp.text)
if detail is None:
print(f"[crawler] Detail parse failed for {beautiful_url}")
return detail