from typing import Any
import structlog
from sqlalchemy import inspect
from vmk_data_collector.db.repositories.ai_enrichment import (
AiEnrichmentRepository,
)
from vmk_data_collector.db.repositories.custom_field import (
CustomFieldRepository,
)
from vmk_data_collector.db.repositories.data_source import (
DataSourceRepository,
)
from vmk_data_collector.db.repositories.image import ImageRepository
from vmk_data_collector.db.repositories.property import PropertyRepository
from vmk_data_collector.db.repositories.property_type import (
PropertyTypeRepository,
)
from vmk_data_collector.db.repositories.raw_data import RawDataRepository
from vmk_data_collector.db.repositories.snapshot import SnapshotRepository
from vmk_data_collector.domain.entities import NormalizedProperty
from vmk_data_collector.domain.enums import RawDataStatus
from vmk_data_collector.schemas.raw_data import IngestResponse
from vmk_data_collector.services.ai_enricher import AiEnricher
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.image_downloader import ImageDownloader
from vmk_data_collector.services.ollama_client import OllamaClient
logger = structlog.get_logger()
class PropertyPipeline:
def __init__(
self,
raw_repo: RawDataRepository,
property_repo: PropertyRepository,
image_repo: ImageRepository,
custom_field_repo: CustomFieldRepository,
snapshot_repo: SnapshotRepository,
enrichment_repo: AiEnrichmentRepository,
data_source_repo: DataSourceRepository,
property_type_repo: PropertyTypeRepository,
normalizer: AiNormalizer,
image_downloader: ImageDownloader,
image_analyzer: AiImageAnalyzer,
enricher: AiEnricher,
) -> None:
self._raw_repo = raw_repo
self._property_repo = property_repo
self._image_repo = image_repo
self._custom_field_repo = custom_field_repo
self._snapshot_repo = snapshot_repo
self._enrichment_repo = enrichment_repo
self._data_source_repo = data_source_repo
self._property_type_repo = property_type_repo
self._normalizer = normalizer
self._image_downloader = image_downloader
self._image_analyzer = image_analyzer
self._enricher = enricher
async def process(self, raw_data_id: int) -> IngestResponse:
raw = await self._raw_repo.get_by_id(raw_data_id)
if raw is None:
raise ValueError(f"Raw data {raw_data_id} not found")
await self._raw_repo.update_status(
raw_data_id, RawDataStatus.processing
)
try:
norm_response = await self._normalizer.normalize(raw.payload)
except Exception as exc:
logger.error(
"pipeline_normalizer_error",
raw_id=raw_data_id,
error=str(exc),
)
await self._raw_repo.update_status(
raw_data_id,
RawDataStatus.failed,
error_message=str(exc),
)
return IngestResponse(
job_id=raw_data_id,
status="failed",
reason="normalizer_error",
message=f"AI normalizer failed: {exc}",
)
if not norm_response.is_real_estate:
await self._raw_repo.update_status(
raw_data_id,
RawDataStatus.invalid,
error_message=norm_response.reason,
)
return IngestResponse(
job_id=raw_data_id,
status="invalid",
reason=norm_response.reason,
message="Payload is not real estate",
)
normalized = NormalizedProperty(
**(norm_response.normalized or {})
)
source_slug = raw.payload.get("source_slug", "unknown")
data_source = await self._data_source_repo.get_or_create_by_slug(
source_slug, name=source_slug
)
existing = await self._property_repo.get_by_source_and_external(
data_source.id, raw.external_id
)
listing_kwargs = self._normalized_to_kwargs(normalized)
if normalized.property_type:
property_type = await self._property_type_repo.get_or_create_by_slug(
normalized.property_type,
name=normalized.property_type,
)
listing_kwargs["property_type_id"] = property_type.id
listing_kwargs.pop("property_type", None)
listing_kwargs["source_id"] = data_source.id
listing_kwargs["external_id"] = raw.external_id
listing_kwargs["raw_data_id"] = raw_data_id
snapshot_id = None
if existing:
snapshot_data = self._listing_to_dict(existing)
changed_fields = self._compute_changed_fields(
snapshot_data, listing_kwargs
)
snapshot = await self._snapshot_repo.create(
existing.id, snapshot_data, changed_fields
)
snapshot_id = snapshot.id
await self._property_repo.update(
existing.id, **listing_kwargs
)
property_id = existing.id
await self._custom_field_repo.delete_by_property(
property_id
)
else:
property_obj = await self._property_repo.create(
**listing_kwargs
)
property_id = property_obj.id
if normalized.custom_fields:
fields = [
{
"field_name": k,
"field_value": str(v),
"field_type": self._infer_field_type(v),
}
for k, v in normalized.custom_fields.items()
]
await self._custom_field_repo.bulk_create(property_id, fields)
aggregated_analysis: dict[str, Any] = {}
for order, url in enumerate(normalized.images):
try:
result = await self._image_downloader.download(
property_id, url, order
)
except Exception as exc:
logger.error(
"image_download_failed",
property_id=property_id,
url=url,
error=str(exc),
)
continue
duplicate = await self._image_repo.get_by_hash(
property_id, result.image_hash
)
if duplicate:
logger.info(
"image_duplicate_skipped",
property_id=property_id,
hash=result.image_hash,
)
continue
image = await self._image_repo.create(
property_id, url, order
)
await self._image_repo.update_downloaded(
image.id,
result.local_path,
result.file_size,
result.width,
result.height,
result.image_hash,
)
if result.local_path:
try:
b64 = OllamaClient.image_to_base64(
result.local_path
)
analysis = await self._image_analyzer.analyze(
b64
)
await self._image_repo.update_analysis(
image.id,
analysis.overall_condition or "",
)
aggregated_analysis[url] = (
analysis.model_dump()
)
except Exception as exc:
logger.error(
"image_analysis_failed",
property_id=property_id,
image_id=image.id,
error=str(exc),
)
try:
enrichment = await self._enricher.enrich(
normalized, aggregated_analysis
)
except Exception as exc:
logger.error(
"pipeline_enricher_error",
property_id=property_id,
error=str(exc),
)
enrichment = None
if enrichment:
await self._enrichment_repo.delete_by_property(
property_id
)
await self._enrichment_repo.create(
property_id,
extracted_features=enrichment.extracted_features,
price_assessment=enrichment.price_assessment,
listing_quality_score=(
enrichment.listing_quality_score
),
reliability_rating=enrichment.reliability_rating,
sentiment_score=enrichment.sentiment_score,
classification=enrichment.classification,
image_analysis_results=aggregated_analysis,
generated_description=(
enrichment.generated_description
),
summary=enrichment.summary,
model_version=enrichment.model_version,
processing_time_ms=(
enrichment.processing_time_ms
),
)
await self._property_repo.update(
property_id,
listing_quality_score=(
enrichment.listing_quality_score
),
reliability_rating=enrichment.reliability_rating,
sentiment_score=enrichment.sentiment_score,
generated_description=(
enrichment.generated_description
),
)
await self._raw_repo.set_processed(raw_data_id)
return IngestResponse(
job_id=raw_data_id,
property_id=property_id,
status="completed",
message="Property ingested successfully",
snapshot_id=snapshot_id,
)
@staticmethod
def _normalized_to_kwargs(
normalized: NormalizedProperty,
) -> dict[str, Any]:
return {
k: v
for k, v in normalized.__dict__.items()
if k not in ("images", "custom_fields")
}
@staticmethod
def _listing_to_dict(listing: Any) -> dict[str, Any]:
mapper = inspect(listing).mapper
return {
col.key: PropertyPipeline._serialize_value(
getattr(listing, col.key)
)
for col in mapper.column_attrs
}
@staticmethod
def _serialize_value(value: Any) -> Any:
from datetime import datetime
from decimal import Decimal
from enum import Enum
if value is None:
return None
if isinstance(value, Decimal):
return float(value)
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, Enum):
return value.value
return value
@staticmethod
def _compute_changed_fields(
old: dict[str, Any],
new: dict[str, Any],
) -> dict[str, Any]:
changed: dict[str, Any] = {}
for key, new_val in new.items():
old_val = old.get(key)
if old_val != new_val:
changed[key] = {"old": old_val, "new": new_val}
return changed
@staticmethod
def _infer_field_type(value: Any) -> str:
if isinstance(value, bool):
return "bool"
if isinstance(value, int):
return "int"
if isinstance(value, float):
return "float"
return "str"