Newer
Older
vmk-360-data_collector / src / vmk_data_collector / services / property_pipeline.py
import asyncio
import dataclasses
import time
from dataclasses import dataclass, field
from typing import Any

import structlog
from sqlalchemy import inspect

from vmk_data_collector.core.circuit_breaker import CircuitBreakerOpenError
from vmk_data_collector.core.exceptions import OllamaRetryableError
from vmk_data_collector.core.metrics import (
    pipeline_duration_seconds,
    pipeline_results_total,
)
from vmk_data_collector.db.repositories.ai_enrichment import (
    AiEnrichmentRepository,
)
from vmk_data_collector.db.repositories.custom_field import (
    CustomFieldRepository,
)
from vmk_data_collector.db.repositories.data_source import (
    DataSourceRepository,
)
from vmk_data_collector.db.repositories.image import ImageRepository
from vmk_data_collector.db.repositories.property import PropertyRepository
from vmk_data_collector.db.repositories.property_type import (
    PropertyTypeRepository,
)
from vmk_data_collector.db.repositories.raw_data import RawDataRepository
from vmk_data_collector.db.repositories.snapshot import SnapshotRepository
from vmk_data_collector.domain.entities import NormalizedProperty
from vmk_data_collector.domain.enums import RawDataStatus
from vmk_data_collector.schemas.raw_data import IngestResponse
from vmk_data_collector.services.ai_enricher import AiEnricher
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.image_downloader import ImageDownloader
from vmk_data_collector.services.ollama_client import OllamaClient

logger = structlog.get_logger()


@dataclass
class PipelineContext:
    raw: Any | None = None
    norm_response: Any | None = None
    normalized: NormalizedProperty | None = None
    data_source: Any | None = None
    existing_listing: Any | None = None
    property_id: int | None = None
    snapshot_id: int | None = None
    aggregated_analysis: dict[str, Any] = field(default_factory=dict)
    enrichment: Any | None = None
    uploaded_image_paths: list[str] = field(default_factory=list)


class PropertyPipeline:
    def __init__(
        self,
        raw_repo: RawDataRepository,
        property_repo: PropertyRepository,
        image_repo: ImageRepository,
        custom_field_repo: CustomFieldRepository,
        snapshot_repo: SnapshotRepository,
        enrichment_repo: AiEnrichmentRepository,
        data_source_repo: DataSourceRepository,
        property_type_repo: PropertyTypeRepository,
        normalizer: AiNormalizer,
        image_downloader: ImageDownloader,
        image_analyzer: AiImageAnalyzer,
        enricher: AiEnricher,
        ollama_client: OllamaClient,
        active_jobs: set[int] | None = None,
    ) -> None:
        self._raw_repo = raw_repo
        self._property_repo = property_repo
        self._image_repo = image_repo
        self._custom_field_repo = custom_field_repo
        self._snapshot_repo = snapshot_repo
        self._enrichment_repo = enrichment_repo
        self._data_source_repo = data_source_repo
        self._property_type_repo = property_type_repo
        self._active_jobs = active_jobs
        self._normalizer = normalizer
        self._image_downloader = image_downloader
        self._image_analyzer = image_analyzer
        self._enricher = enricher
        self._ollama_client = ollama_client

    async def process(
        self, raw_data_id: int, uploaded_image_paths: list[str] | None = None
    ) -> IngestResponse:
        if self._active_jobs is not None:
            self._active_jobs.add(raw_data_id)
        start = time.perf_counter()
        context = PipelineContext()
        logger.info("pipeline_start", raw_id=raw_data_id)

        try:
            # Stage 1: Load raw data
            context.raw = await self._stage_load_raw(raw_data_id)
            logger.info("pipeline_loaded", raw_id=raw_data_id)

            # Use uploaded image paths passed directly from endpoint
            if uploaded_image_paths:
                context.uploaded_image_paths = uploaded_image_paths
                logger.info(
                    "pipeline_uploaded_images_found",
                    raw_id=raw_data_id,
                    image_count=len(uploaded_image_paths),
                )

            # Stage 2: Normalize via AI
            context.norm_response = await self._stage_normalize(
                raw_data_id, context.raw
            )
            if not context.norm_response.is_real_estate:
                pipeline_results_total.labels(status="invalid").inc()
                logger.info(
                    "pipeline_not_real_estate",
                    raw_id=raw_data_id,
                    reason=context.norm_response.reason,
                )
                await self._raw_repo.update_status(
                    raw_data_id,
                    RawDataStatus.invalid,
                    error_message=context.norm_response.reason,
                )
                return IngestResponse(
                    job_id=raw_data_id,
                    status="invalid",
                    reason=context.norm_response.reason,
                    message="Payload is not real estate",
                )

            _allowed_fields = {f.name for f in dataclasses.fields(NormalizedProperty)}
            _norm_payload = {
                k: v
                for k, v in (context.norm_response.normalized or {}).items()
                if k in _allowed_fields
            }
            context.normalized = NormalizedProperty(**_norm_payload)

            # Fallback: if AI didn't return images, use raw payload photos/images
            if not context.normalized.images:
                raw_payload = context.raw.payload if hasattr(context.raw, "payload") else {}
                fallback_images = raw_payload.get("photos") or raw_payload.get("images") or []
                if isinstance(fallback_images, list):
                    context.normalized.images = [
                        url for url in fallback_images if isinstance(url, str)
                    ]
                    if context.normalized.images:
                        logger.info(
                            "pipeline_images_fallback",
                            raw_id=raw_data_id,
                            image_count=len(context.normalized.images),
                        )

            # Stage 3: Resolve source and existing listing
            (
                context.data_source,
                context.existing_listing,
            ) = await self._stage_resolve_source_and_listing(
                raw_data_id, context.raw, context.normalized
            )
            logger.info(
                "pipeline_source_resolved",
                raw_id=raw_data_id,
                source_id=context.data_source.id,
                existing_id=getattr(context.existing_listing, "id", None),
            )

            # Stage 4: Persist listing (create or update)
            (
                context.property_id,
                context.snapshot_id,
            ) = await self._stage_persist_listing(
                raw_data_id,
                context.raw,
                context.normalized,
                context.data_source,
                context.existing_listing,
            )
            logger.info(
                "pipeline_persisted",
                raw_id=raw_data_id,
                property_id=context.property_id,
                snapshot_id=context.snapshot_id,
            )

            # Stage 5: Custom fields
            await self._stage_persist_custom_fields(
                context.property_id,
                context.normalized.custom_fields,
            )

            # Stage 6: Download and analyze images
            if context.uploaded_image_paths:
                context.aggregated_analysis = await self._stage_process_uploaded_images(
                    context.property_id,
                    context.uploaded_image_paths,
                )
            else:
                context.aggregated_analysis = await self._stage_process_remote_images(
                    context.property_id,
                    context.normalized.images,
                )
            logger.info(
                "pipeline_images_processed",
                raw_id=raw_data_id,
                property_id=context.property_id,
                image_count=len(context.uploaded_image_paths or context.normalized.images or []),
            )

            # Stage 7: AI enrichment
            context.enrichment = await self._stage_enrich(
                context.property_id,
                context.normalized,
                context.aggregated_analysis,
            )
            logger.info(
                "pipeline_enriched",
                raw_id=raw_data_id,
                property_id=context.property_id,
            )

            # Stage 8: Generate embedding
            await self._stage_embed(
                context.property_id,
                context.normalized,
            )
            logger.info(
                "pipeline_embedding_generated",
                raw_id=raw_data_id,
                property_id=context.property_id,
            )

            # Stage 9: Finalize
            result = await self._stage_finalize(
                raw_data_id,
                context.property_id,
                context.snapshot_id,
            )
            pipeline_results_total.labels(status="completed").inc()
            logger.info(
                "pipeline_completed",
                raw_id=raw_data_id,
                property_id=context.property_id,
                duration=time.perf_counter() - start,
            )
            return result

        except CircuitBreakerOpenError as exc:
            pipeline_results_total.labels(status="failed").inc()
            logger.warning(
                "pipeline_circuit_breaker_open",
                raw_id=raw_data_id,
                error=str(exc),
            )
            raise
        except Exception as exc:
            pipeline_results_total.labels(status="failed").inc()
            logger.error(
                "pipeline_unhandled_error",
                raw_id=raw_data_id,
                error=str(exc),
                exc_info=True,
            )
            raise
        finally:
            pipeline_duration_seconds.observe(time.perf_counter() - start)
            if self._active_jobs is not None:
                self._active_jobs.discard(raw_data_id)

    # ------------------------------------------------------------------
    # Stages
    # ------------------------------------------------------------------

    async def _stage_load_raw(self, raw_data_id: int) -> Any:
        raw = await self._raw_repo.get_by_id(raw_data_id)
        if raw is None:
            raise ValueError(f"Raw data {raw_data_id} not found")
        await self._raw_repo.update_status(
            raw_data_id, RawDataStatus.processing
        )
        return raw

    async def _stage_normalize(
        self, raw_data_id: int, raw: Any
    ) -> Any:
        try:
            return await self._normalizer.normalize(raw.payload)
        except CircuitBreakerOpenError:
            raise
        except Exception as exc:
            logger.error(
                "pipeline_normalizer_error",
                raw_id=raw_data_id,
                error=str(exc),
            )
            raise

    async def _stage_resolve_source_and_listing(
        self,
        raw_data_id: int,
        raw: Any,
        normalized: NormalizedProperty,
    ) -> tuple[Any, Any]:
        source_slug = raw.payload.get("source_slug", "unknown")
        data_source = await self._data_source_repo.get_or_create_by_slug(
            source_slug, name=source_slug
        )

        existing = await self._property_repo.get_by_source_and_external(
            data_source.id, raw.external_id
        )
        return data_source, existing

    async def _stage_persist_listing(
        self,
        raw_data_id: int,
        raw: Any,
        normalized: NormalizedProperty,
        data_source: Any,
        existing: Any,
    ) -> tuple[int, int | None]:
        listing_kwargs = self._normalized_to_kwargs(normalized)
        if normalized.property_type:
            property_type = await self._property_type_repo.get_or_create_by_slug(
                normalized.property_type,
                name=normalized.property_type,
            )
            listing_kwargs["property_type_id"] = property_type.id
        listing_kwargs.pop("property_type", None)
        listing_kwargs["source_id"] = data_source.id
        listing_kwargs["external_id"] = raw.external_id
        listing_kwargs["raw_data_id"] = raw_data_id

        snapshot_id: int | None = None
        if existing:
            snapshot_data = self._listing_to_dict(existing)
            changed_fields = self._compute_changed_fields(
                snapshot_data, listing_kwargs
            )
            snapshot = await self._snapshot_repo.create(
                existing.id, snapshot_data, changed_fields
            )
            snapshot_id = snapshot.id
            await self._property_repo.update(existing.id, **listing_kwargs)
            property_id = existing.id
            await self._custom_field_repo.delete_by_property(property_id)
        else:
            property_obj = await self._property_repo.create(
                **listing_kwargs
            )
            property_id = property_obj.id

        return property_id, snapshot_id

    async def _stage_persist_custom_fields(
        self,
        property_id: int,
        custom_fields: dict[str, Any],
    ) -> None:
        if not custom_fields:
            return
        fields = [
            {
                "field_name": k,
                "field_value": str(v),
                "field_type": self._infer_field_type(v),
            }
            for k, v in custom_fields.items()
        ]
        await self._custom_field_repo.bulk_create(property_id, fields)

    async def _stage_process_uploaded_images(
        self,
        property_id: int,
        image_paths: list[str],
    ) -> dict[str, Any]:
        aggregated_analysis: dict[str, Any] = {}

        for order, temp_path in enumerate(image_paths):
            try:
                result = await self._image_downloader.process_local_file(
                    property_id, temp_path, order
                )
            except Exception as exc:
                logger.error(
                    "image_uploaded_process_failed",
                    property_id=property_id,
                    temp_path=temp_path,
                    error=str(exc),
                )
                continue

            duplicate = await self._image_repo.get_by_hash(
                property_id, result.image_hash
            )
            if duplicate:
                logger.info(
                    "image_duplicate_skipped",
                    property_id=property_id,
                    hash=result.image_hash,
                )
                continue

            image = await self._image_repo.create(
                property_id, temp_path, order
            )
            await self._image_repo.update_downloaded(
                image.id,
                result.local_path,
                result.file_size,
                result.width,
                result.height,
                result.image_hash,
            )

            if result.local_path:
                try:
                    b64 = await asyncio.to_thread(
                        OllamaClient.image_to_base64,
                        result.local_path,
                    )
                    analysis = await self._image_analyzer.analyze(b64)
                    await self._image_repo.update_analysis(
                        image.id,
                        analysis.overall_condition or "",
                    )
                    aggregated_analysis[temp_path] = analysis.model_dump()
                except Exception as exc:
                    logger.error(
                        "image_analysis_failed",
                        property_id=property_id,
                        image_id=image.id,
                        error=str(exc),
                    )

        all_images = await self._image_repo.get_by_property(property_id)
        await self._property_repo.update(
            property_id, images_count=len(all_images)
        )

        # Clean up temp directory for this raw_id
        if image_paths:
            try:
                from pathlib import Path

                temp_dir = Path(image_paths[0]).parent
                if temp_dir.name != str(property_id):
                    # Only remove if it's a temp dir, not the permanent property dir
                    for p in image_paths:
                        Path(p).unlink(missing_ok=True)
                    temp_dir.rmdir()
                    logger.info(
                        "pipeline_temp_images_cleaned",
                        property_id=property_id,
                        temp_dir=str(temp_dir),
                    )
            except Exception as exc:
                logger.warning(
                    "pipeline_temp_cleanup_failed",
                    property_id=property_id,
                    error=str(exc),
                )

        return aggregated_analysis

    async def _stage_process_remote_images(
        self,
        property_id: int,
        images: list[str],
    ) -> dict[str, Any]:
        aggregated_analysis: dict[str, Any] = {}
        semaphore = asyncio.Semaphore(3)

        async def _process_one(url: str, order: int) -> dict[str, Any] | None:
            async with semaphore:
                try:
                    result = await self._image_downloader.download(
                        property_id, url, order
                    )
                except Exception as exc:
                    logger.error(
                        "image_download_failed",
                        property_id=property_id,
                        url=url,
                        error=str(exc),
                    )
                    return None

                duplicate = await self._image_repo.get_by_hash(
                    property_id, result.image_hash
                )
                if duplicate:
                    logger.info(
                        "image_duplicate_skipped",
                        property_id=property_id,
                        hash=result.image_hash,
                    )
                    return None

                image = await self._image_repo.create(
                    property_id, url, order
                )
                await self._image_repo.update_downloaded(
                    image.id,
                    result.local_path,
                    result.file_size,
                    result.width,
                    result.height,
                    result.image_hash,
                )

                if result.local_path:
                    try:
                        b64 = await asyncio.to_thread(
                            OllamaClient.image_to_base64,
                            result.local_path,
                        )
                        analysis = await self._image_analyzer.analyze(
                            b64
                        )
                        await self._image_repo.update_analysis(
                            image.id,
                            analysis.overall_condition or "",
                        )
                        return {url: analysis.model_dump()}
                    except Exception as exc:
                        logger.error(
                            "image_analysis_failed",
                            property_id=property_id,
                            image_id=image.id,
                            error=str(exc),
                        )
                return None

        tasks = [
            _process_one(url, order)
            for order, url in enumerate(images)
        ]
        results = await asyncio.gather(*tasks)
        for r in results:
            if r:
                aggregated_analysis.update(r)

        all_images = await self._image_repo.get_by_property(property_id)
        await self._property_repo.update(
            property_id, images_count=len(all_images)
        )

        return aggregated_analysis

    async def _stage_enrich(
        self,
        property_id: int,
        normalized: NormalizedProperty,
        aggregated_analysis: dict[str, Any],
    ) -> Any:
        try:
            enrichment = await self._enricher.enrich(
                normalized, aggregated_analysis
            )
        except CircuitBreakerOpenError:
            raise
        except OllamaRetryableError:
            raise
        except Exception as exc:
            logger.error(
                "pipeline_enricher_error",
                property_id=property_id,
                error=str(exc),
            )
            return None

        if enrichment:
            await self._enrichment_repo.delete_by_property(property_id)
            await self._enrichment_repo.create(
                property_id,
                extracted_features=enrichment.extracted_features,
                price_assessment=enrichment.price_assessment,
                listing_quality_score=enrichment.listing_quality_score,
                reliability_rating=enrichment.reliability_rating,
                sentiment_score=enrichment.sentiment_score,
                classification=enrichment.classification,
                image_analysis_results=aggregated_analysis,
                generated_description=enrichment.generated_description,
                summary=enrichment.summary,
                model_version=enrichment.model_version,
                processing_time_ms=enrichment.processing_time_ms,
            )
            await self._property_repo.update(
                property_id,
                listing_quality_score=enrichment.listing_quality_score,
                reliability_rating=enrichment.reliability_rating,
                sentiment_score=enrichment.sentiment_score,
                generated_description=enrichment.generated_description,
            )

        return enrichment

    async def _stage_embed(
        self,
        property_id: int,
        normalized: NormalizedProperty,
    ) -> None:
        from vmk_data_collector.core.config import settings

        text = self._build_embedding_text(normalized)
        if not text.strip():
            logger.warning(
                "pipeline_embedding_empty_text",
                property_id=property_id,
            )
            return

        try:
            embeddings = await self._ollama_client.embed(
                model=settings.ollama_embedding_model,
                texts=[text],
            )
        except CircuitBreakerOpenError:
            logger.warning(
                "pipeline_embedding_circuit_breaker_open",
                property_id=property_id,
            )
            return
        except OllamaRetryableError:
            logger.warning(
                "pipeline_embedding_retryable_error",
                property_id=property_id,
            )
            return
        except Exception as exc:
            logger.error(
                "pipeline_embedding_error",
                property_id=property_id,
                error=str(exc),
            )
            return

        if embeddings and embeddings[0]:
            await self._property_repo.update_embedding(
                property_id, embeddings[0]
            )
            logger.info(
                "pipeline_embedding_saved",
                property_id=property_id,
                vector_dim=len(embeddings[0]),
            )

    async def _stage_finalize(
        self,
        raw_data_id: int,
        property_id: int,
        snapshot_id: int | None,
    ) -> IngestResponse:
        await self._raw_repo.set_processed(raw_data_id)
        return IngestResponse(
            job_id=raw_data_id,
            property_id=property_id,
            status="completed",
            message="Property ingested successfully",
            snapshot_id=snapshot_id,
        )

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    _BOOLEAN_FIELDS = frozenset(
        {
            "has_balcony",
            "has_loggia",
            "has_freight_elevator",
            "internet",
            "security",
            "is_agent",
        }
    )

    _DATETIME_FIELDS = frozenset({"publish_date", "archived_at"})

    @staticmethod
    def _to_bool(value: Any) -> bool | None:
        if value is None:
            return None
        if isinstance(value, bool):
            return value
        if isinstance(value, (int, float)):
            return bool(value)
        if isinstance(value, str):
            lowered = value.strip().lower()
            if lowered in ("true", "1", "yes", "да", "+"):
                return True
            if lowered in ("false", "0", "no", "нет", "-", ""):
                return False
            # Non-empty arbitrary string → True (e.g. "подземные паркинги, охрана")
            return bool(lowered)
        return bool(value)

    @staticmethod
    def _to_datetime(value: Any) -> Any:
        from datetime import datetime

        if value is None or isinstance(value, datetime):
            return value
        if isinstance(value, str):
            value = value.strip()
            if not value:
                return None
            for fmt in (
                "%Y-%m-%d %H:%M:%S",
                "%Y-%m-%dT%H:%M:%S",
                "%Y-%m-%d",
                "%d.%m.%Y %H:%M",
                "%d.%m.%Y",
            ):
                try:
                    return datetime.strptime(value, fmt)
                except ValueError:
                    continue
        return value

    _ENUM_MAPPINGS: dict[str, dict[str, str]] = {
        "building_type": {
            "кирпич": "brick",
            "кирпичный": "brick",
            "панель": "panel",
            "панельный": "panel",
            "монолит": "monolith",
            "монолитный": "monolith",
            "монолитно-каркасный": "monolith",
            "монолитно-каркас": "monolith",
            "газобетон": "gas_block",
            "газоблок": "gas_block",
            "дерево": "wood",
            "деревянный": "wood",
            "новый дом": "monolith",
            "сталинка": "brick",
            "хрущевка": "panel",
            "брежневка": "panel",
        },
        "renovation_status": {
            "косметический": "cosmetic",
            "косметика": "cosmetic",
            "евро": "euro",
            "евроремонт": "euro",
            "дизайнерский": "designer",
            "дизайн": "designer",
            "без ремонта": "none",
            "нет": "none",
            "строительная отделка": "construction",
            "нуждается в обновлении": "construction",
            "предчистовая": "construction",
        },
        "deal_type": {
            "продажа": "sale",
            "аренда": "rent_long",
            "долгосрочная аренда": "rent_long",
            "посуточно": "rent_short",
            "краткосрочная аренда": "rent_short",
        },
        "layout": {
            "студия": "studio",
            "раздельная": "separate",
            "смежная": "adjacent",
            "смежно-раздельная": "adjacent",
        },
        "bathroom_type": {
            "совмещенный": "combined",
            "совмещенная": "combined",
            "раздельный": "separate",
            "раздельная": "separate",
            "несколько": "multiple",
        },
        "parking_type": {
            "наземная": "ground",
            "наземный": "ground",
            "подземная": "underground",
            "подземный": "underground",
            "подземные паркинги": "underground",
            "нет": "none",
            "отсутствует": "none",
            "гараж": "garage",
        },
        "heating_type": {
            "центральное": "central",
            "централизованное": "central",
            "автономное": "autonomous",
            "автономный": "autonomous",
            "электрическое": "floor",
            "тёплые полы": "floor",
            "нет": "none",
            "отсутствует": "none",
            "индивидуальное газовое": "autonomous",
            "газовое": "autonomous",
        },
        "window_view": {
            "во двор": "yard",
            "двор": "yard",
            "на улицу": "street",
            "улица": "street",
            "на парк": "park",
            "парк": "park",
            "на воду": "water",
            "вода": "water",
            "на лес": "forest",
            "лес": "forest",
        },
        "metro_distance_type": {
            "пешком": "walk",
            "пешком": "walk",
            "транспортом": "transport",
        },
        "listing_status": {
            "активное": "active",
            "active": "active",
            "продано": "sold",
            "sold": "sold",
            "сдано": "rented",
            "rented": "rented",
            "удалено": "removed",
            "removed": "removed",
            "в архиве": "archived",
            "archived": "archived",
        },
    }

    @staticmethod
    def _build_embedding_text(
        normalized: NormalizedProperty,
    ) -> str:
        """Build a dense description for vector embedding generation."""
        parts: list[str] = []

        if normalized.deal_type:
            parts.append(f"{normalized.deal_type}")
        if normalized.title:
            parts.append(f"{normalized.title}")
        if normalized.description:
            parts.append(f"{normalized.description}")
        if normalized.generated_description:
            parts.append(f"{normalized.generated_description}")

        location_parts: list[str] = []
        if normalized.city:
            location_parts.append(f"місто {normalized.city}")
        if normalized.district:
            location_parts.append(f"район {normalized.district}")
        if normalized.micro_district:
            location_parts.append(f"мікрорайон {normalized.micro_district}")
        if normalized.street:
            location_parts.append(f"вулиця {normalized.street}")
        if location_parts:
            parts.append(", ".join(location_parts))

        if normalized.rooms_count is not None:
            parts.append(f"{normalized.rooms_count} кімнат")
        if normalized.total_area:
            parts.append(f"площа {normalized.total_area} м²")
        if normalized.floor and normalized.floors_total:
            parts.append(f"поверх {normalized.floor} з {normalized.floors_total}")
        elif normalized.floor:
            parts.append(f"поверх {normalized.floor}")
        if normalized.building_type:
            parts.append(f"тип будинку {normalized.building_type}")
        if normalized.renovation_status:
            parts.append(f"ремонт {normalized.renovation_status}")
        if normalized.price:
            currency = normalized.currency or ""
            parts.append(f"ціна {normalized.price} {currency}")

        return ". ".join(parts)

    @staticmethod
    def _to_enum(field: str, value: Any) -> Any:
        if value is None or not isinstance(value, str):
            return value
        mapping = PropertyPipeline._ENUM_MAPPINGS.get(field, {})
        if not mapping:
            return value
        lowered = value.strip().lower()
        # Exact match
        if lowered in mapping:
            return mapping[lowered]
        # Substring match (e.g. "подземные паркинги, охрана" → underground)
        for ru, en in mapping.items():
            if ru in lowered:
                return en
        # Unknown value → null (safer than crashing)
        return None

    @staticmethod
    def _normalized_to_kwargs(
        normalized: NormalizedProperty,
    ) -> dict[str, Any]:
        result: dict[str, Any] = {}
        for k, v in normalized.__dict__.items():
            if k in ("images", "custom_fields"):
                continue
            if k in PropertyPipeline._BOOLEAN_FIELDS:
                v = PropertyPipeline._to_bool(v)
            elif k in PropertyPipeline._DATETIME_FIELDS:
                v = PropertyPipeline._to_datetime(v)
            elif k in PropertyPipeline._ENUM_MAPPINGS:
                v = PropertyPipeline._to_enum(k, v)
            result[k] = v
        return result

    @staticmethod
    def _listing_to_dict(listing: Any) -> dict[str, Any]:
        mapper = inspect(listing).mapper
        return {
            col.name: PropertyPipeline._serialize_value(
                getattr(listing, col.name)
            )
            for col in mapper.columns
        }

    @staticmethod
    def _serialize_value(value: Any) -> Any:
        from datetime import datetime
        from decimal import Decimal
        from enum import Enum

        if value is None:
            return None
        if isinstance(value, Decimal):
            return float(value)
        if isinstance(value, datetime):
            return value.isoformat()
        if isinstance(value, Enum):
            return value.value
        return value

    @staticmethod
    def _compute_changed_fields(
        old: dict[str, Any],
        new: dict[str, Any],
    ) -> dict[str, Any]:
        changed: dict[str, Any] = {}
        for key, new_val in new.items():
            old_val = old.get(key)
            serialized_new = PropertyPipeline._serialize_value(new_val)
            if old_val != serialized_new:
                changed[key] = {"old": old_val, "new": serialized_new}
        return changed

    @staticmethod
    def _infer_field_type(value: Any) -> str:
        if isinstance(value, bool):
            return "bool"
        if isinstance(value, int):
            return "int"
        if isinstance(value, float):
            return "float"
        return "str"