"""Transform DOM.RIA raw data into a data_collector payload.
Aligns with data_collector PayloadSchema (see data_collector/src/vmk_data_collector/schemas/raw_data.py):
- images : list[str] (photo URLs)
- price : str|float|int (normalized price)
- contact_phone: str (first phone number)
- address : str (combined address line)
- area : str|float|int (total area)
- rooms : str|int (rooms count)
- floor : str|int (floor number)
- title, description, url (direct mapping)
Extra fields not in the strict schema are kept because model_config = {"extra": "allow"}.
"""
from typing import Any, Dict, List, Optional
def _first(val):
"""Helper: return first element if list, else the value itself."""
if isinstance(val, list) and val:
return val[0]
return val
def _safe_int(val) -> Optional[int]:
try:
return int(float(str(val).replace(" ", "").replace(",", ".")))
except (ValueError, TypeError):
return None
def _safe_float(val) -> Optional[float]:
try:
return float(str(val).replace(" ", "").replace(",", "."))
except (ValueError, TypeError):
return None
def _extract_photos(raw_photos) -> List[Dict]:
"""Normalise DOM.RIA photo field to a list of dicts.
Sometimes photos come as a list, sometimes as a dict keyed by ordering.
"""
if isinstance(raw_photos, list):
return raw_photos
if isinstance(raw_photos, dict):
return list(raw_photos.values())
return []
def _build_image_urls(photos: List[Dict]) -> List[str]:
"""Turn photo metadata into full HTTPS URLs.
DOM.RIA photos are served from cdn.riastatic.com.
We can inject 'xl' before the extension for a larger variant.
"""
urls: List[str] = []
for p in photos:
if not isinstance(p, dict):
continue
base = (
p.get("file")
or p.get("beautifulUrl")
or p.get("photo_base_url")
or p.get("url")
or p.get("src")
)
if not base:
continue
if base.startswith("//"):
base = "https:" + base
elif base.startswith("/"):
base = "https://cdn.riastatic.com" + base
elif not base.startswith("http"):
base = "https://cdn.riastatic.com/" + base
# xl variant if plain .jpg
if base.endswith(".jpg") and "xl" not in base.split("/")[-1]:
base = base.replace(".jpg", "xl.jpg")
urls.append(base)
return urls
def _extract_phones(detail: Dict[str, Any]) -> List[str]:
"""Pull phone numbers from the detail object if available."""
phones: List[str] = []
user = detail.get("user")
user_contacts = user.get("contacts") if isinstance(user, dict) else None
contacts = detail.get("contacts") or user_contacts
if not isinstance(contacts, dict):
return phones
for key in ("phones", "phone", "mobile", "tel"):
vals = contacts.get(key)
if isinstance(vals, list):
for v in vals:
if isinstance(v, dict):
phones.append(v.get("phone") or v.get("number") or str(v))
elif isinstance(v, str):
phones.append(v)
elif isinstance(vals, str):
phones.append(vals)
# dedupe preserving order
seen = set()
uniq = []
for p in phones:
p = str(p).strip()
if p and p not in seen:
seen.add(p)
uniq.append(p)
return uniq
def _build_address(city: str, district: str, street: str, building: str) -> str:
"""Combine address parts into a single human-readable string."""
parts = [p for p in (city, district, street, building) if p]
return ", ".join(parts)
def normalize_listing(
catalog_item: Dict[str, Any],
detail_realty: Optional[Dict[str, Any]] = None,
city_name_meta: str = "",
) -> Dict[str, Any]:
"""Create the payload dict that will be sent to data_collector.
Maps DOM.RIA fields to data_collector PayloadSchema where possible;
everything else is kept in `raw_domria`.
"""
c = catalog_item # shorthand
d = detail_realty or {}
# --- price -------------------------------------------------------------------
price = _safe_int(c.get("price_USD")) or _safe_int(c.get("price_usd"))
if price is None:
price = _safe_int(d.get("priceObj", {}).get("priceUSD"))
if price is None:
pa = c.get("priceArr") or d.get("priceArr")
if isinstance(pa, dict):
price = _safe_int(pa.get("1"))
elif isinstance(pa, list):
for entry in pa:
if isinstance(entry, dict) and entry.get("currency") == "USD":
price = _safe_int(entry.get("price"))
break
if price is None:
price = _safe_int(c.get("price"))
# --- images ------------------------------------------------------------------
raw_catalog_photos = _extract_photos(c.get("photos"))
raw_detail_photos = _extract_photos(d.get("photos"))
photo_source = raw_detail_photos if raw_detail_photos else raw_catalog_photos
image_urls = _build_image_urls(photo_source)
if not image_urls:
main = d.get("main_photo")
if isinstance(main, dict):
image_urls = _build_image_urls([main])
# --- phones ------------------------------------------------------------------
phones = _extract_phones(d)
contact_phone = phones[0] if phones else None
# --- address -----------------------------------------------------------------
city = (d.get("city_name") or c.get("city_name") or city_name_meta or "").strip()
district = (d.get("district_name") or c.get("district_name") or "").strip()
street = (d.get("street_name") or c.get("street_name") or "").strip()
building = (d.get("building_number") or c.get("building_number") or "").strip()
address = _build_address(city, district, street, building)
# --- schema-aligned payload --------------------------------------------------
normalized: Dict[str, Any] = {
# Strict schema fields (PayloadSchema)
"title": (d.get("title") or c.get("title") or "").strip(),
"description": (d.get("description") or c.get("description") or "").strip(),
"price": price,
"url": f"https://dom.ria.com/uk/{(c.get('beautiful_url') or '').lstrip('/')}",
"images": image_urls,
"contact_phone": contact_phone,
"address": address,
"area": _safe_float(c.get("total_square_meters")),
"rooms": _safe_int(c.get("rooms_count")),
"floor": _safe_int(c.get("floor")),
# Extra fields (extra="allow" in PayloadSchema)
"external_id": str(c.get("realty_id")),
"price_usd": price,
"price_raw": c.get("priceArr") or c.get("price"),
"area_total_m2": _safe_float(c.get("total_square_meters")),
"area_living_m2": _safe_float(
d.get("living_square_meters") or c.get("living_square_meters")
),
"area_kitchen_m2": _safe_float(
d.get("kitchen_square_meters") or c.get("kitchen_square_meters")
),
"floors_total": _safe_int(d.get("floors_count") or c.get("floors_count")),
"city_name": city,
"district_name": district,
"street_name": street,
"building_number": building,
"latitude": _safe_float(c.get("lat")),
"longitude": _safe_float(c.get("lng")),
"contact_phones": phones,
"tags": [
t.get("name_uk") or t.get("name")
for t in (d.get("tags") or c.get("tags") or [])
if isinstance(t, dict)
],
"raw_domria": {
"catalog_item": c,
"detail_realty": d,
},
}
# Remove None values to keep payload compact
cleaned = {k: v for k, v in normalized.items() if v is not None}
return cleaned