Newer
Older
vmk-360-domria_parser / src / normalizer.py
"""Transform DOM.RIA raw data into a data_collector payload.

Aligns with data_collector PayloadSchema (see data_collector/src/vmk_data_collector/schemas/raw_data.py):
    - images   : list[str]       (photo URLs)
    - price    : str|float|int   (normalized price)
    - contact_phone: str         (first phone number)
    - address  : str             (combined address line)
    - area     : str|float|int  (total area)
    - rooms    : str|int         (rooms count)
    - floor    : str|int         (floor number)
    - title, description, url    (direct mapping)

Extra fields not in the strict schema are kept because model_config = {"extra": "allow"}.
"""
from typing import Any, Dict, List, Optional


def _first(val):
    """Helper: return first element if list, else the value itself."""
    if isinstance(val, list) and val:
        return val[0]
    return val


def _safe_int(val) -> Optional[int]:
    try:
        return int(float(str(val).replace(" ", "").replace(",", ".")))
    except (ValueError, TypeError):
        return None


def _safe_float(val) -> Optional[float]:
    try:
        return float(str(val).replace(" ", "").replace(",", "."))
    except (ValueError, TypeError):
        return None


def _extract_photos(raw_photos) -> List[Dict]:
    """Normalise DOM.RIA photo field to a list of dicts.

    Sometimes photos come as a list, sometimes as a dict keyed by ordering.
    """
    if isinstance(raw_photos, list):
        return raw_photos
    if isinstance(raw_photos, dict):
        return list(raw_photos.values())
    return []


def _build_image_urls(photos: List[Dict]) -> List[str]:
    """Turn photo metadata into full HTTPS URLs.

    DOM.RIA photos are served from cdn.riastatic.com.
    We can inject 'xl' before the extension for a larger variant.
    """
    urls: List[str] = []
    for p in photos:
        if not isinstance(p, dict):
            continue
        base = (
            p.get("file")
            or p.get("beautifulUrl")
            or p.get("photo_base_url")
            or p.get("url")
            or p.get("src")
        )
        if not base:
            continue
        if base.startswith("//"):
            base = "https:" + base
        elif base.startswith("/"):
            base = "https://cdn.riastatic.com" + base
        elif not base.startswith("http"):
            base = "https://cdn.riastatic.com/" + base
        # xl variant if plain .jpg
        if base.endswith(".jpg") and "xl" not in base.split("/")[-1]:
            base = base.replace(".jpg", "xl.jpg")
        urls.append(base)
    return urls


def _extract_phones(detail: Dict[str, Any]) -> List[str]:
    """Pull phone numbers from the detail object if available."""
    phones: List[str] = []
    user = detail.get("user")
    user_contacts = user.get("contacts") if isinstance(user, dict) else None
    contacts = detail.get("contacts") or user_contacts
    if not isinstance(contacts, dict):
        return phones

    for key in ("phones", "phone", "mobile", "tel"):
        vals = contacts.get(key)
        if isinstance(vals, list):
            for v in vals:
                if isinstance(v, dict):
                    phones.append(v.get("phone") or v.get("number") or str(v))
                elif isinstance(v, str):
                    phones.append(v)
        elif isinstance(vals, str):
            phones.append(vals)

    # dedupe preserving order
    seen = set()
    uniq = []
    for p in phones:
        p = str(p).strip()
        if p and p not in seen:
            seen.add(p)
            uniq.append(p)
    return uniq


def _build_address(city: str, district: str, street: str, building: str) -> str:
    """Combine address parts into a single human-readable string."""
    parts = [p for p in (city, district, street, building) if p]
    return ", ".join(parts)


def normalize_listing(
    catalog_item: Dict[str, Any],
    detail_realty: Optional[Dict[str, Any]] = None,
    city_name_meta: str = "",
) -> Dict[str, Any]:
    """Create the payload dict that will be sent to data_collector.

    Maps DOM.RIA fields to data_collector PayloadSchema where possible;
    everything else is kept in `raw_domria`.
    """
    c = catalog_item  # shorthand
    d = detail_realty or {}

    # --- price -------------------------------------------------------------------
    price = _safe_int(c.get("price_USD")) or _safe_int(c.get("price_usd"))
    if price is None:
        price = _safe_int(d.get("priceObj", {}).get("priceUSD"))
    if price is None:
        pa = c.get("priceArr") or d.get("priceArr")
        if isinstance(pa, dict):
            price = _safe_int(pa.get("1"))
        elif isinstance(pa, list):
            for entry in pa:
                if isinstance(entry, dict) and entry.get("currency") == "USD":
                    price = _safe_int(entry.get("price"))
                    break
    if price is None:
        price = _safe_int(c.get("price"))

    # --- images ------------------------------------------------------------------
    raw_catalog_photos = _extract_photos(c.get("photos"))
    raw_detail_photos = _extract_photos(d.get("photos"))
    photo_source = raw_detail_photos if raw_detail_photos else raw_catalog_photos
    image_urls = _build_image_urls(photo_source)
    if not image_urls:
        main = d.get("main_photo")
        if isinstance(main, dict):
            image_urls = _build_image_urls([main])

    # --- phones ------------------------------------------------------------------
    phones = _extract_phones(d)
    contact_phone = phones[0] if phones else None

    # --- address -----------------------------------------------------------------
    city = (d.get("city_name") or c.get("city_name") or city_name_meta or "").strip()
    district = (d.get("district_name") or c.get("district_name") or "").strip()
    street = (d.get("street_name") or c.get("street_name") or "").strip()
    building = (d.get("building_number") or c.get("building_number") or "").strip()
    address = _build_address(city, district, street, building)

    # --- schema-aligned payload --------------------------------------------------
    normalized: Dict[str, Any] = {
        # Strict schema fields (PayloadSchema)
        "title": (d.get("title") or c.get("title") or "").strip(),
        "description": (d.get("description") or c.get("description") or "").strip(),
        "price": price,
        "url": f"https://dom.ria.com/uk/{(c.get('beautiful_url') or '').lstrip('/')}",
        "images": image_urls,
        "contact_phone": contact_phone,
        "address": address,
        "area": _safe_float(c.get("total_square_meters")),
        "rooms": _safe_int(c.get("rooms_count")),
        "floor": _safe_int(c.get("floor")),

        # Extra fields (extra="allow" in PayloadSchema)
        "external_id": str(c.get("realty_id")),
        "price_usd": price,
        "price_raw": c.get("priceArr") or c.get("price"),
        "area_total_m2": _safe_float(c.get("total_square_meters")),
        "area_living_m2": _safe_float(
            d.get("living_square_meters") or c.get("living_square_meters")
        ),
        "area_kitchen_m2": _safe_float(
            d.get("kitchen_square_meters") or c.get("kitchen_square_meters")
        ),
        "floors_total": _safe_int(d.get("floors_count") or c.get("floors_count")),
        "city_name": city,
        "district_name": district,
        "street_name": street,
        "building_number": building,
        "latitude": _safe_float(c.get("lat")),
        "longitude": _safe_float(c.get("lng")),
        "contact_phones": phones,
        "tags": [
            t.get("name_uk") or t.get("name")
            for t in (d.get("tags") or c.get("tags") or [])
            if isinstance(t, dict)
        ],
        "raw_domria": {
            "catalog_item": c,
            "detail_realty": d,
        },
    }

    # Remove None values to keep payload compact
    cleaned = {k: v for k, v in normalized.items() if v is not None}
    return cleaned