diff --git a/src/collector.py b/src/collector.py index e715aad..31645e0 100644 --- a/src/collector.py +++ b/src/collector.py @@ -1,14 +1,15 @@ """HTTP client for the local data_collector service.""" import json -import time -from typing import Any, Dict, Optional +from pathlib import Path +from typing import Any, Dict, List, Optional -from curl_cffi import requests +from curl_cffi import CurlMime, requests from src.config import ( DATA_COLLECTOR_BASE_URL, DATA_COLLECTOR_HEALTH_ENDPOINT, DATA_COLLECTOR_INGEST_ENDPOINT, + DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT, DATA_COLLECTOR_SOURCE_SLUG, ) @@ -67,3 +68,67 @@ except Exception as exc: print(f"[collector] Ingest exception for {external_id}: {exc}") return None + + def ingest_with_images( + self, + external_id: str, + payload: Dict[str, Any], + image_paths: List[str], + source_slug: str = DATA_COLLECTOR_SOURCE_SLUG, + ) -> Optional[Dict[str, Any]]: + """Send listing + binary images via multipart to /api/v1/ingest/with-images. + + Pipeline runs inline, so we increase timeout to 120s. + """ + metadata = json.dumps( + { + "source_slug": source_slug, + "external_id": external_id, + "payload": payload, + }, + ensure_ascii=False, + ) + url = f"{self.base_url}{DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT}" + mime: Optional[CurlMime] = None + try: + mime = CurlMime() + for path in image_paths: + p = Path(path) + if not p.exists(): + continue + mime.addpart( + name="images", + local_path=str(p), + filename=p.name, + content_type="application/octet-stream", + ) + + resp = self.session.post( + url, + data={"metadata": metadata}, + multipart=mime, + timeout=120, + ) + if resp.status_code == 202: + data = resp.json() + print( + f"[collector] Ingested with-images {external_id}: " + f"status={data.get('status')} job_id={data.get('job_id')} " + f"property_id={data.get('property_id')}" + ) + return data + else: + print( + f"[collector] Ingest with-images failed for {external_id}: " + f"HTTP {resp.status_code} body={resp.text[:200]}" + ) + return None + except Exception as exc: + print(f"[collector] Ingest with-images exception for {external_id}: {exc}") + return None + finally: + if mime is not None: + try: + mime.close() + except Exception: + pass diff --git a/src/config.py b/src/config.py index 8ad12bf..0be9509 100644 --- a/src/config.py +++ b/src/config.py @@ -5,6 +5,7 @@ # --- data_collector ----------------------------------------------------------- DATA_COLLECTOR_BASE_URL = "http://localhost:8020" DATA_COLLECTOR_INGEST_ENDPOINT = "/api/v1/ingest" +DATA_COLLECTOR_INGEST_WITH_IMAGES_ENDPOINT = "/api/v1/ingest/with-images" DATA_COLLECTOR_HEALTH_ENDPOINT = "/api/v1/health" DATA_COLLECTOR_SOURCE_SLUG = "domria" diff --git a/src/crawler.py b/src/crawler.py index 74d9bee..fbc4cc4 100644 --- a/src/crawler.py +++ b/src/crawler.py @@ -1,7 +1,12 @@ """Core crawling loop: catalog → detail → ingest.""" +import os +import shutil +import tempfile import time -from typing import Any, Dict, List, Optional +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, List, Optional, Tuple +import curl_cffi.requests as _curl_requests from curl_cffi.requests import Response from src.config import ( @@ -116,17 +121,30 @@ catalog_item=item, detail_realty=detail, city_name_meta=target.city_name, + category_id=target.category_id, ) - image_count = len(payload.get("images", [])) + image_urls = payload.get("images", []) + image_count = len(image_urls) if image_count == 0: print( f"[crawler] WARNING {realty_id}: zero images in payload " f"(catalog={len(item.get('photos') or [])}, detail={len(detail.get('photos') or []) if detail else 'N/A'})" ) + image_paths: List[str] = [] + temp_dir = "" + if self.collector is not None and image_urls: + image_paths, temp_dir = self._download_images(image_urls) + if self.collector is not None: - result = self.collector.ingest(realty_id, payload) + if image_paths: + result = self.collector.ingest_with_images( + realty_id, payload, image_paths + ) + else: + result = self.collector.ingest(realty_id, payload) + if result: job_id = result.get("job_id") self.storage.mark_processed(realty_id, job_id=job_id) @@ -137,10 +155,53 @@ else: print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})") + # --- cleanup temp images -------------------------------------------- + if temp_dir: + shutil.rmtree(temp_dir, ignore_errors=True) + # --- rate limiting between detail pages ----------------------------- self._detail_counter += 1 time.sleep(DELAY_BETWEEN_DETAIL_PAGES) + def _download_images(self, urls: List[str]) -> Tuple[List[str], str]: + """Download image URLs to a temp directory. + + Returns (list of file paths, temp directory path). + """ + if not urls: + return [], "" + temp_dir = tempfile.mkdtemp(prefix="domria_img_") + paths: List[str] = [] + + def _one(url: str, idx: int) -> Optional[str]: + try: + resp = _curl_requests.get(url, timeout=30, impersonate="chrome124") + if resp.status_code != 200: + return None + # infer extension from url, default .jpg + suffix = ".jpg" + if "." in url.split("/")[-1]: + ext = url.split("/")[-1].split(".")[-1].split("?")[0] + if ext and len(ext) <= 5: + suffix = f".{ext}" + dest = os.path.join(temp_dir, f"{idx}{suffix}") + with open(dest, "wb") as f: + f.write(resp.content) + return dest + except Exception as exc: + print(f"[crawler] Image download failed {url}: {exc}") + return None + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(_one, url, i) for i, url in enumerate(urls)] + for future in futures: + path = future.result() + if path: + paths.append(path) + + print(f"[crawler] Downloaded {len(paths)}/{len(urls)} images to {temp_dir}") + return paths, temp_dir + def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]: resp = self.session.get_detail(beautiful_url) if resp.status_code != 200: diff --git a/src/normalizer.py b/src/normalizer.py index 7f2d836..aa6d1dd 100644 --- a/src/normalizer.py +++ b/src/normalizer.py @@ -51,30 +51,31 @@ def _build_image_urls(photos: List[Dict]) -> List[str]: """Turn photo metadata into full HTTPS URLs. - DOM.RIA photos are served from cdn.riastatic.com. - We can inject 'xl' before the extension for a larger variant. + DOM.RIA now serves photos from cdn.riastatic.com/photosnew/ + using *beautifulUrl* (available on detail page). The legacy + *file* path returns 415/404 and is ignored. """ urls: List[str] = [] for p in photos: if not isinstance(p, dict): continue + # beautifulUrl is the only reliable source (detail page) base = ( - p.get("file") - or p.get("beautifulUrl") - or p.get("photo_base_url") + p.get("beautifulUrl") or p.get("url") or p.get("src") + or p.get("photo_base_url") ) if not base: continue if base.startswith("//"): base = "https:" + base elif base.startswith("/"): - base = "https://cdn.riastatic.com" + base + base = "https://cdn.riastatic.com/photosnew" + base elif not base.startswith("http"): - base = "https://cdn.riastatic.com/" + base - # xl variant if plain .jpg - if base.endswith(".jpg") and "xl" not in base.split("/")[-1]: + base = "https://cdn.riastatic.com/photosnew/" + base + # prefer xl variant if plain .jpg + if base.endswith(".jpg") and not base.endswith("xl.jpg") and not base.endswith("fl.jpg"): base = base.replace(".jpg", "xl.jpg") urls.append(base) return urls @@ -121,6 +122,7 @@ catalog_item: Dict[str, Any], detail_realty: Optional[Dict[str, Any]] = None, city_name_meta: str = "", + category_id: Optional[int] = None, ) -> Dict[str, Any]: """Create the payload dict that will be sent to data_collector. @@ -167,11 +169,31 @@ building = (d.get("building_number") or c.get("building_number") or "").strip() address = _build_address(city, district, street, building) + # --- title / description (PayloadSchema requires at least one) --------------- + title = (d.get("title") or c.get("title") or "").strip() + description = (d.get("description") or c.get("description") or "").strip() + if not title and not description: + parts = [] + rooms_val = _safe_int(c.get("rooms_count")) + if rooms_val: + parts.append(f"{rooms_val}-кімн.") + cat_name = { + 1: "Квартира", + 2: "Будинок", + 3: "Земельна ділянка", + 4: "Комерційна нерухомість", + 5: "Гараж", + }.get(category_id, "Нерухомість") + parts.append(cat_name) + if city: + parts.append(f"м. {city}") + title = ", ".join(parts) + # --- schema-aligned payload -------------------------------------------------- normalized: Dict[str, Any] = { # Strict schema fields (PayloadSchema) - "title": (d.get("title") or c.get("title") or "").strip(), - "description": (d.get("description") or c.get("description") or "").strip(), + "title": title, + "description": description, "price": price, "url": f"https://dom.ria.com/uk/{(c.get('beautiful_url') or '').lstrip('/')}", "images": image_urls,