Newer
Older
vmk-360-data_collector / src / vmk_data_collector / services / property_pipeline.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 1 day ago 11 KB feat: core pipeline + FastAPI API (Phases 0-6)
from typing import Any

import structlog
from sqlalchemy import inspect

from vmk_data_collector.db.repositories.ai_enrichment import (
    AiEnrichmentRepository,
)
from vmk_data_collector.db.repositories.custom_field import (
    CustomFieldRepository,
)
from vmk_data_collector.db.repositories.data_source import (
    DataSourceRepository,
)
from vmk_data_collector.db.repositories.image import ImageRepository
from vmk_data_collector.db.repositories.property import PropertyRepository
from vmk_data_collector.db.repositories.property_type import (
    PropertyTypeRepository,
)
from vmk_data_collector.db.repositories.raw_data import RawDataRepository
from vmk_data_collector.db.repositories.snapshot import SnapshotRepository
from vmk_data_collector.domain.entities import NormalizedProperty
from vmk_data_collector.domain.enums import RawDataStatus
from vmk_data_collector.schemas.raw_data import IngestResponse
from vmk_data_collector.services.ai_enricher import AiEnricher
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.image_downloader import ImageDownloader
from vmk_data_collector.services.ollama_client import OllamaClient

logger = structlog.get_logger()


class PropertyPipeline:
    def __init__(
        self,
        raw_repo: RawDataRepository,
        property_repo: PropertyRepository,
        image_repo: ImageRepository,
        custom_field_repo: CustomFieldRepository,
        snapshot_repo: SnapshotRepository,
        enrichment_repo: AiEnrichmentRepository,
        data_source_repo: DataSourceRepository,
        property_type_repo: PropertyTypeRepository,
        normalizer: AiNormalizer,
        image_downloader: ImageDownloader,
        image_analyzer: AiImageAnalyzer,
        enricher: AiEnricher,
    ) -> None:
        self._raw_repo = raw_repo
        self._property_repo = property_repo
        self._image_repo = image_repo
        self._custom_field_repo = custom_field_repo
        self._snapshot_repo = snapshot_repo
        self._enrichment_repo = enrichment_repo
        self._data_source_repo = data_source_repo
        self._property_type_repo = property_type_repo
        self._normalizer = normalizer
        self._image_downloader = image_downloader
        self._image_analyzer = image_analyzer
        self._enricher = enricher

    async def process(self, raw_data_id: int) -> IngestResponse:
        raw = await self._raw_repo.get_by_id(raw_data_id)
        if raw is None:
            raise ValueError(f"Raw data {raw_data_id} not found")

        await self._raw_repo.update_status(
            raw_data_id, RawDataStatus.processing
        )

        try:
            norm_response = await self._normalizer.normalize(raw.payload)
        except Exception as exc:
            logger.error(
                "pipeline_normalizer_error",
                raw_id=raw_data_id,
                error=str(exc),
            )
            await self._raw_repo.update_status(
                raw_data_id,
                RawDataStatus.failed,
                error_message=str(exc),
            )
            return IngestResponse(
                job_id=raw_data_id,
                status="failed",
                reason="normalizer_error",
                message=f"AI normalizer failed: {exc}",
            )

        if not norm_response.is_real_estate:
            await self._raw_repo.update_status(
                raw_data_id,
                RawDataStatus.invalid,
                error_message=norm_response.reason,
            )
            return IngestResponse(
                job_id=raw_data_id,
                status="invalid",
                reason=norm_response.reason,
                message="Payload is not real estate",
            )

        normalized = NormalizedProperty(
            **(norm_response.normalized or {})
        )
        source_slug = raw.payload.get("source_slug", "unknown")
        data_source = await self._data_source_repo.get_or_create_by_slug(
            source_slug, name=source_slug
        )

        existing = await self._property_repo.get_by_source_and_external(
            data_source.id, raw.external_id
        )

        listing_kwargs = self._normalized_to_kwargs(normalized)
        if normalized.property_type:
            property_type = await self._property_type_repo.get_or_create_by_slug(
                normalized.property_type,
                name=normalized.property_type,
            )
            listing_kwargs["property_type_id"] = property_type.id
        listing_kwargs.pop("property_type", None)
        listing_kwargs["source_id"] = data_source.id
        listing_kwargs["external_id"] = raw.external_id
        listing_kwargs["raw_data_id"] = raw_data_id

        snapshot_id = None
        if existing:
            snapshot_data = self._listing_to_dict(existing)
            changed_fields = self._compute_changed_fields(
                snapshot_data, listing_kwargs
            )
            snapshot = await self._snapshot_repo.create(
                existing.id, snapshot_data, changed_fields
            )
            snapshot_id = snapshot.id
            await self._property_repo.update(
                existing.id, **listing_kwargs
            )
            property_id = existing.id
            await self._custom_field_repo.delete_by_property(
                property_id
            )
        else:
            property_obj = await self._property_repo.create(
                **listing_kwargs
            )
            property_id = property_obj.id

        if normalized.custom_fields:
            fields = [
                {
                    "field_name": k,
                    "field_value": str(v),
                    "field_type": self._infer_field_type(v),
                }
                for k, v in normalized.custom_fields.items()
            ]
            await self._custom_field_repo.bulk_create(property_id, fields)

        aggregated_analysis: dict[str, Any] = {}
        for order, url in enumerate(normalized.images):
            try:
                result = await self._image_downloader.download(
                    property_id, url, order
                )
            except Exception as exc:
                logger.error(
                    "image_download_failed",
                    property_id=property_id,
                    url=url,
                    error=str(exc),
                )
                continue

            duplicate = await self._image_repo.get_by_hash(
                property_id, result.image_hash
            )
            if duplicate:
                logger.info(
                    "image_duplicate_skipped",
                    property_id=property_id,
                    hash=result.image_hash,
                )
                continue

            image = await self._image_repo.create(
                property_id, url, order
            )
            await self._image_repo.update_downloaded(
                image.id,
                result.local_path,
                result.file_size,
                result.width,
                result.height,
                result.image_hash,
            )

            if result.local_path:
                try:
                    b64 = OllamaClient.image_to_base64(
                        result.local_path
                    )
                    analysis = await self._image_analyzer.analyze(
                        b64
                    )
                    await self._image_repo.update_analysis(
                        image.id,
                        analysis.overall_condition or "",
                    )
                    aggregated_analysis[url] = (
                        analysis.model_dump()
                    )
                except Exception as exc:
                    logger.error(
                        "image_analysis_failed",
                        property_id=property_id,
                        image_id=image.id,
                        error=str(exc),
                    )

        try:
            enrichment = await self._enricher.enrich(
                normalized, aggregated_analysis
            )
        except Exception as exc:
            logger.error(
                "pipeline_enricher_error",
                property_id=property_id,
                error=str(exc),
            )
            enrichment = None

        if enrichment:
            await self._enrichment_repo.delete_by_property(
                property_id
            )
            await self._enrichment_repo.create(
                property_id,
                extracted_features=enrichment.extracted_features,
                price_assessment=enrichment.price_assessment,
                listing_quality_score=(
                    enrichment.listing_quality_score
                ),
                reliability_rating=enrichment.reliability_rating,
                sentiment_score=enrichment.sentiment_score,
                classification=enrichment.classification,
                image_analysis_results=aggregated_analysis,
                generated_description=(
                    enrichment.generated_description
                ),
                summary=enrichment.summary,
                model_version=enrichment.model_version,
                processing_time_ms=(
                    enrichment.processing_time_ms
                ),
            )
            await self._property_repo.update(
                property_id,
                listing_quality_score=(
                    enrichment.listing_quality_score
                ),
                reliability_rating=enrichment.reliability_rating,
                sentiment_score=enrichment.sentiment_score,
                generated_description=(
                    enrichment.generated_description
                ),
            )

        await self._raw_repo.set_processed(raw_data_id)

        return IngestResponse(
            job_id=raw_data_id,
            property_id=property_id,
            status="completed",
            message="Property ingested successfully",
            snapshot_id=snapshot_id,
        )

    @staticmethod
    def _normalized_to_kwargs(
        normalized: NormalizedProperty,
    ) -> dict[str, Any]:
        return {
            k: v
            for k, v in normalized.__dict__.items()
            if k not in ("images", "custom_fields")
        }

    @staticmethod
    def _listing_to_dict(listing: Any) -> dict[str, Any]:
        mapper = inspect(listing).mapper
        return {
            col.key: PropertyPipeline._serialize_value(
                getattr(listing, col.key)
            )
            for col in mapper.column_attrs
        }

    @staticmethod
    def _serialize_value(value: Any) -> Any:
        from datetime import datetime
        from decimal import Decimal
        from enum import Enum

        if value is None:
            return None
        if isinstance(value, Decimal):
            return float(value)
        if isinstance(value, datetime):
            return value.isoformat()
        if isinstance(value, Enum):
            return value.value
        return value

    @staticmethod
    def _compute_changed_fields(
        old: dict[str, Any],
        new: dict[str, Any],
    ) -> dict[str, Any]:
        changed: dict[str, Any] = {}
        for key, new_val in new.items():
            old_val = old.get(key)
            if old_val != new_val:
                changed[key] = {"old": old_val, "new": new_val}
        return changed

    @staticmethod
    def _infer_field_type(value: Any) -> str:
        if isinstance(value, bool):
            return "bool"
        if isinstance(value, int):
            return "int"
        if isinstance(value, float):
            return "float"
        return "str"