import asyncio
import dataclasses
import time
from dataclasses import dataclass, field
from typing import Any
import structlog
from sqlalchemy import inspect
from vmk_data_collector.core.circuit_breaker import CircuitBreakerOpenError
from vmk_data_collector.core.exceptions import OllamaRetryableError
from vmk_data_collector.core.metrics import (
pipeline_duration_seconds,
pipeline_results_total,
)
from vmk_data_collector.db.repositories.ai_enrichment import (
AiEnrichmentRepository,
)
from vmk_data_collector.db.repositories.custom_field import (
CustomFieldRepository,
)
from vmk_data_collector.db.repositories.data_source import (
DataSourceRepository,
)
from vmk_data_collector.db.repositories.image import ImageRepository
from vmk_data_collector.db.repositories.property import PropertyRepository
from vmk_data_collector.db.repositories.property_type import (
PropertyTypeRepository,
)
from vmk_data_collector.db.repositories.raw_data import RawDataRepository
from vmk_data_collector.db.repositories.snapshot import SnapshotRepository
from vmk_data_collector.domain.entities import NormalizedProperty
from vmk_data_collector.domain.enums import RawDataStatus
from vmk_data_collector.schemas.raw_data import IngestResponse
from vmk_data_collector.services.ai_enricher import AiEnricher
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.image_downloader import ImageDownloader
from vmk_data_collector.services.ollama_client import OllamaClient
logger = structlog.get_logger()
@dataclass
class PipelineContext:
raw: Any | None = None
norm_response: Any | None = None
normalized: NormalizedProperty | None = None
data_source: Any | None = None
existing_listing: Any | None = None
property_id: int | None = None
snapshot_id: int | None = None
aggregated_analysis: dict[str, Any] = field(default_factory=dict)
enrichment: Any | None = None
uploaded_image_paths: list[str] = field(default_factory=list)
class PropertyPipeline:
def __init__(
self,
raw_repo: RawDataRepository,
property_repo: PropertyRepository,
image_repo: ImageRepository,
custom_field_repo: CustomFieldRepository,
snapshot_repo: SnapshotRepository,
enrichment_repo: AiEnrichmentRepository,
data_source_repo: DataSourceRepository,
property_type_repo: PropertyTypeRepository,
normalizer: AiNormalizer,
image_downloader: ImageDownloader,
image_analyzer: AiImageAnalyzer,
enricher: AiEnricher,
active_jobs: set[int] | None = None,
) -> None:
self._raw_repo = raw_repo
self._property_repo = property_repo
self._image_repo = image_repo
self._custom_field_repo = custom_field_repo
self._snapshot_repo = snapshot_repo
self._enrichment_repo = enrichment_repo
self._data_source_repo = data_source_repo
self._property_type_repo = property_type_repo
self._active_jobs = active_jobs
self._normalizer = normalizer
self._image_downloader = image_downloader
self._image_analyzer = image_analyzer
self._enricher = enricher
async def process(
self, raw_data_id: int, uploaded_image_paths: list[str] | None = None
) -> IngestResponse:
if self._active_jobs is not None:
self._active_jobs.add(raw_data_id)
start = time.perf_counter()
context = PipelineContext()
logger.info("pipeline_start", raw_id=raw_data_id)
try:
# Stage 1: Load raw data
context.raw = await self._stage_load_raw(raw_data_id)
logger.info("pipeline_loaded", raw_id=raw_data_id)
# Use uploaded image paths passed directly from endpoint
if uploaded_image_paths:
context.uploaded_image_paths = uploaded_image_paths
logger.info(
"pipeline_uploaded_images_found",
raw_id=raw_data_id,
image_count=len(uploaded_image_paths),
)
# Stage 2: Normalize via AI
context.norm_response = await self._stage_normalize(
raw_data_id, context.raw
)
if not context.norm_response.is_real_estate:
pipeline_results_total.labels(status="invalid").inc()
logger.info(
"pipeline_not_real_estate",
raw_id=raw_data_id,
reason=context.norm_response.reason,
)
await self._raw_repo.update_status(
raw_data_id,
RawDataStatus.invalid,
error_message=context.norm_response.reason,
)
return IngestResponse(
job_id=raw_data_id,
status="invalid",
reason=context.norm_response.reason,
message="Payload is not real estate",
)
_allowed_fields = {f.name for f in dataclasses.fields(NormalizedProperty)}
_norm_payload = {
k: v
for k, v in (context.norm_response.normalized or {}).items()
if k in _allowed_fields
}
context.normalized = NormalizedProperty(**_norm_payload)
# Fallback: if AI didn't return images, use raw payload photos/images
if not context.normalized.images:
raw_payload = context.raw.payload if hasattr(context.raw, "payload") else {}
fallback_images = raw_payload.get("photos") or raw_payload.get("images") or []
if isinstance(fallback_images, list):
context.normalized.images = [
url for url in fallback_images if isinstance(url, str)
]
if context.normalized.images:
logger.info(
"pipeline_images_fallback",
raw_id=raw_data_id,
image_count=len(context.normalized.images),
)
# Stage 3: Resolve source and existing listing
(
context.data_source,
context.existing_listing,
) = await self._stage_resolve_source_and_listing(
raw_data_id, context.raw, context.normalized
)
logger.info(
"pipeline_source_resolved",
raw_id=raw_data_id,
source_id=context.data_source.id,
existing_id=getattr(context.existing_listing, "id", None),
)
# Stage 4: Persist listing (create or update)
(
context.property_id,
context.snapshot_id,
) = await self._stage_persist_listing(
raw_data_id,
context.raw,
context.normalized,
context.data_source,
context.existing_listing,
)
logger.info(
"pipeline_persisted",
raw_id=raw_data_id,
property_id=context.property_id,
snapshot_id=context.snapshot_id,
)
# Stage 5: Custom fields
await self._stage_persist_custom_fields(
context.property_id,
context.normalized.custom_fields,
)
# Stage 6: Download and analyze images
if context.uploaded_image_paths:
context.aggregated_analysis = await self._stage_process_uploaded_images(
context.property_id,
context.uploaded_image_paths,
)
else:
context.aggregated_analysis = await self._stage_process_remote_images(
context.property_id,
context.normalized.images,
)
logger.info(
"pipeline_images_processed",
raw_id=raw_data_id,
property_id=context.property_id,
image_count=len(context.uploaded_image_paths or context.normalized.images or []),
)
# Stage 7: AI enrichment
context.enrichment = await self._stage_enrich(
context.property_id,
context.normalized,
context.aggregated_analysis,
)
logger.info(
"pipeline_enriched",
raw_id=raw_data_id,
property_id=context.property_id,
)
# Stage 8: Finalize
result = await self._stage_finalize(
raw_data_id,
context.property_id,
context.snapshot_id,
)
pipeline_results_total.labels(status="completed").inc()
logger.info(
"pipeline_completed",
raw_id=raw_data_id,
property_id=context.property_id,
duration=time.perf_counter() - start,
)
return result
except CircuitBreakerOpenError as exc:
pipeline_results_total.labels(status="failed").inc()
logger.warning(
"pipeline_circuit_breaker_open",
raw_id=raw_data_id,
error=str(exc),
)
raise
except Exception as exc:
pipeline_results_total.labels(status="failed").inc()
logger.error(
"pipeline_unhandled_error",
raw_id=raw_data_id,
error=str(exc),
exc_info=True,
)
raise
finally:
pipeline_duration_seconds.observe(time.perf_counter() - start)
if self._active_jobs is not None:
self._active_jobs.discard(raw_data_id)
# ------------------------------------------------------------------
# Stages
# ------------------------------------------------------------------
async def _stage_load_raw(self, raw_data_id: int) -> Any:
raw = await self._raw_repo.get_by_id(raw_data_id)
if raw is None:
raise ValueError(f"Raw data {raw_data_id} not found")
await self._raw_repo.update_status(
raw_data_id, RawDataStatus.processing
)
return raw
async def _stage_normalize(
self, raw_data_id: int, raw: Any
) -> Any:
try:
return await self._normalizer.normalize(raw.payload)
except CircuitBreakerOpenError:
raise
except Exception as exc:
logger.error(
"pipeline_normalizer_error",
raw_id=raw_data_id,
error=str(exc),
)
raise
async def _stage_resolve_source_and_listing(
self,
raw_data_id: int,
raw: Any,
normalized: NormalizedProperty,
) -> tuple[Any, Any]:
source_slug = raw.payload.get("source_slug", "unknown")
data_source = await self._data_source_repo.get_or_create_by_slug(
source_slug, name=source_slug
)
existing = await self._property_repo.get_by_source_and_external(
data_source.id, raw.external_id
)
return data_source, existing
async def _stage_persist_listing(
self,
raw_data_id: int,
raw: Any,
normalized: NormalizedProperty,
data_source: Any,
existing: Any,
) -> tuple[int, int | None]:
listing_kwargs = self._normalized_to_kwargs(normalized)
if normalized.property_type:
property_type = await self._property_type_repo.get_or_create_by_slug(
normalized.property_type,
name=normalized.property_type,
)
listing_kwargs["property_type_id"] = property_type.id
listing_kwargs.pop("property_type", None)
listing_kwargs["source_id"] = data_source.id
listing_kwargs["external_id"] = raw.external_id
listing_kwargs["raw_data_id"] = raw_data_id
snapshot_id: int | None = None
if existing:
snapshot_data = self._listing_to_dict(existing)
changed_fields = self._compute_changed_fields(
snapshot_data, listing_kwargs
)
snapshot = await self._snapshot_repo.create(
existing.id, snapshot_data, changed_fields
)
snapshot_id = snapshot.id
await self._property_repo.update(existing.id, **listing_kwargs)
property_id = existing.id
await self._custom_field_repo.delete_by_property(property_id)
else:
property_obj = await self._property_repo.create(
**listing_kwargs
)
property_id = property_obj.id
return property_id, snapshot_id
async def _stage_persist_custom_fields(
self,
property_id: int,
custom_fields: dict[str, Any],
) -> None:
if not custom_fields:
return
fields = [
{
"field_name": k,
"field_value": str(v),
"field_type": self._infer_field_type(v),
}
for k, v in custom_fields.items()
]
await self._custom_field_repo.bulk_create(property_id, fields)
async def _stage_process_uploaded_images(
self,
property_id: int,
image_paths: list[str],
) -> dict[str, Any]:
aggregated_analysis: dict[str, Any] = {}
for order, temp_path in enumerate(image_paths):
try:
result = await self._image_downloader.process_local_file(
property_id, temp_path, order
)
except Exception as exc:
logger.error(
"image_uploaded_process_failed",
property_id=property_id,
temp_path=temp_path,
error=str(exc),
)
continue
duplicate = await self._image_repo.get_by_hash(
property_id, result.image_hash
)
if duplicate:
logger.info(
"image_duplicate_skipped",
property_id=property_id,
hash=result.image_hash,
)
continue
image = await self._image_repo.create(
property_id, temp_path, order
)
await self._image_repo.update_downloaded(
image.id,
result.local_path,
result.file_size,
result.width,
result.height,
result.image_hash,
)
if result.local_path:
try:
b64 = await asyncio.to_thread(
OllamaClient.image_to_base64,
result.local_path,
)
analysis = await self._image_analyzer.analyze(b64)
await self._image_repo.update_analysis(
image.id,
analysis.overall_condition or "",
)
aggregated_analysis[temp_path] = analysis.model_dump()
except Exception as exc:
logger.error(
"image_analysis_failed",
property_id=property_id,
image_id=image.id,
error=str(exc),
)
all_images = await self._image_repo.get_by_property(property_id)
await self._property_repo.update(
property_id, images_count=len(all_images)
)
# Clean up temp directory for this raw_id
if image_paths:
try:
from pathlib import Path
temp_dir = Path(image_paths[0]).parent
if temp_dir.name != str(property_id):
# Only remove if it's a temp dir, not the permanent property dir
for p in image_paths:
Path(p).unlink(missing_ok=True)
temp_dir.rmdir()
logger.info(
"pipeline_temp_images_cleaned",
property_id=property_id,
temp_dir=str(temp_dir),
)
except Exception as exc:
logger.warning(
"pipeline_temp_cleanup_failed",
property_id=property_id,
error=str(exc),
)
return aggregated_analysis
async def _stage_process_remote_images(
self,
property_id: int,
images: list[str],
) -> dict[str, Any]:
aggregated_analysis: dict[str, Any] = {}
semaphore = asyncio.Semaphore(3)
async def _process_one(url: str, order: int) -> dict[str, Any] | None:
async with semaphore:
try:
result = await self._image_downloader.download(
property_id, url, order
)
except Exception as exc:
logger.error(
"image_download_failed",
property_id=property_id,
url=url,
error=str(exc),
)
return None
duplicate = await self._image_repo.get_by_hash(
property_id, result.image_hash
)
if duplicate:
logger.info(
"image_duplicate_skipped",
property_id=property_id,
hash=result.image_hash,
)
return None
image = await self._image_repo.create(
property_id, url, order
)
await self._image_repo.update_downloaded(
image.id,
result.local_path,
result.file_size,
result.width,
result.height,
result.image_hash,
)
if result.local_path:
try:
b64 = await asyncio.to_thread(
OllamaClient.image_to_base64,
result.local_path,
)
analysis = await self._image_analyzer.analyze(
b64
)
await self._image_repo.update_analysis(
image.id,
analysis.overall_condition or "",
)
return {url: analysis.model_dump()}
except Exception as exc:
logger.error(
"image_analysis_failed",
property_id=property_id,
image_id=image.id,
error=str(exc),
)
return None
tasks = [
_process_one(url, order)
for order, url in enumerate(images)
]
results = await asyncio.gather(*tasks)
for r in results:
if r:
aggregated_analysis.update(r)
all_images = await self._image_repo.get_by_property(property_id)
await self._property_repo.update(
property_id, images_count=len(all_images)
)
return aggregated_analysis
async def _stage_enrich(
self,
property_id: int,
normalized: NormalizedProperty,
aggregated_analysis: dict[str, Any],
) -> Any:
try:
enrichment = await self._enricher.enrich(
normalized, aggregated_analysis
)
except CircuitBreakerOpenError:
raise
except OllamaRetryableError:
raise
except Exception as exc:
logger.error(
"pipeline_enricher_error",
property_id=property_id,
error=str(exc),
)
return None
if enrichment:
await self._enrichment_repo.delete_by_property(property_id)
await self._enrichment_repo.create(
property_id,
extracted_features=enrichment.extracted_features,
price_assessment=enrichment.price_assessment,
listing_quality_score=enrichment.listing_quality_score,
reliability_rating=enrichment.reliability_rating,
sentiment_score=enrichment.sentiment_score,
classification=enrichment.classification,
image_analysis_results=aggregated_analysis,
generated_description=enrichment.generated_description,
summary=enrichment.summary,
model_version=enrichment.model_version,
processing_time_ms=enrichment.processing_time_ms,
)
await self._property_repo.update(
property_id,
listing_quality_score=enrichment.listing_quality_score,
reliability_rating=enrichment.reliability_rating,
sentiment_score=enrichment.sentiment_score,
generated_description=enrichment.generated_description,
)
return enrichment
async def _stage_finalize(
self,
raw_data_id: int,
property_id: int,
snapshot_id: int | None,
) -> IngestResponse:
await self._raw_repo.set_processed(raw_data_id)
return IngestResponse(
job_id=raw_data_id,
property_id=property_id,
status="completed",
message="Property ingested successfully",
snapshot_id=snapshot_id,
)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
_BOOLEAN_FIELDS = frozenset(
{
"has_balcony",
"has_loggia",
"has_freight_elevator",
"internet",
"security",
"is_agent",
}
)
_DATETIME_FIELDS = frozenset({"publish_date", "archived_at"})
@staticmethod
def _to_bool(value: Any) -> bool | None:
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in ("true", "1", "yes", "да", "+"):
return True
if lowered in ("false", "0", "no", "нет", "-", ""):
return False
# Non-empty arbitrary string → True (e.g. "подземные паркинги, охрана")
return bool(lowered)
return bool(value)
@staticmethod
def _to_datetime(value: Any) -> Any:
from datetime import datetime
if value is None or isinstance(value, datetime):
return value
if isinstance(value, str):
value = value.strip()
if not value:
return None
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d",
"%d.%m.%Y %H:%M",
"%d.%m.%Y",
):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return value
_ENUM_MAPPINGS: dict[str, dict[str, str]] = {
"building_type": {
"кирпич": "brick",
"кирпичный": "brick",
"панель": "panel",
"панельный": "panel",
"монолит": "monolith",
"монолитный": "monolith",
"монолитно-каркасный": "monolith",
"монолитно-каркас": "monolith",
"газобетон": "gas_block",
"газоблок": "gas_block",
"дерево": "wood",
"деревянный": "wood",
"новый дом": "monolith",
"сталинка": "brick",
"хрущевка": "panel",
"брежневка": "panel",
},
"renovation_status": {
"косметический": "cosmetic",
"косметика": "cosmetic",
"евро": "euro",
"евроремонт": "euro",
"дизайнерский": "designer",
"дизайн": "designer",
"без ремонта": "none",
"нет": "none",
"строительная отделка": "construction",
"нуждается в обновлении": "construction",
"предчистовая": "construction",
},
"deal_type": {
"продажа": "sale",
"аренда": "rent_long",
"долгосрочная аренда": "rent_long",
"посуточно": "rent_short",
"краткосрочная аренда": "rent_short",
},
"layout": {
"студия": "studio",
"раздельная": "separate",
"смежная": "adjacent",
"смежно-раздельная": "adjacent",
},
"bathroom_type": {
"совмещенный": "combined",
"совмещенная": "combined",
"раздельный": "separate",
"раздельная": "separate",
"несколько": "multiple",
},
"parking_type": {
"наземная": "ground",
"наземный": "ground",
"подземная": "underground",
"подземный": "underground",
"подземные паркинги": "underground",
"нет": "none",
"отсутствует": "none",
"гараж": "garage",
},
"heating_type": {
"центральное": "central",
"централизованное": "central",
"автономное": "autonomous",
"автономный": "autonomous",
"электрическое": "floor",
"тёплые полы": "floor",
"нет": "none",
"отсутствует": "none",
"индивидуальное газовое": "autonomous",
"газовое": "autonomous",
},
"window_view": {
"во двор": "yard",
"двор": "yard",
"на улицу": "street",
"улица": "street",
"на парк": "park",
"парк": "park",
"на воду": "water",
"вода": "water",
"на лес": "forest",
"лес": "forest",
},
"metro_distance_type": {
"пешком": "walk",
"пешком": "walk",
"транспортом": "transport",
},
"listing_status": {
"активное": "active",
"active": "active",
"продано": "sold",
"sold": "sold",
"сдано": "rented",
"rented": "rented",
"удалено": "removed",
"removed": "removed",
"в архиве": "archived",
"archived": "archived",
},
}
@staticmethod
def _to_enum(field: str, value: Any) -> Any:
if value is None or not isinstance(value, str):
return value
mapping = PropertyPipeline._ENUM_MAPPINGS.get(field, {})
if not mapping:
return value
lowered = value.strip().lower()
# Exact match
if lowered in mapping:
return mapping[lowered]
# Substring match (e.g. "подземные паркинги, охрана" → underground)
for ru, en in mapping.items():
if ru in lowered:
return en
# Unknown value → null (safer than crashing)
return None
@staticmethod
def _normalized_to_kwargs(
normalized: NormalizedProperty,
) -> dict[str, Any]:
result: dict[str, Any] = {}
for k, v in normalized.__dict__.items():
if k in ("images", "custom_fields"):
continue
if k in PropertyPipeline._BOOLEAN_FIELDS:
v = PropertyPipeline._to_bool(v)
elif k in PropertyPipeline._DATETIME_FIELDS:
v = PropertyPipeline._to_datetime(v)
elif k in PropertyPipeline._ENUM_MAPPINGS:
v = PropertyPipeline._to_enum(k, v)
result[k] = v
return result
@staticmethod
def _listing_to_dict(listing: Any) -> dict[str, Any]:
mapper = inspect(listing).mapper
return {
col.name: PropertyPipeline._serialize_value(
getattr(listing, col.name)
)
for col in mapper.columns
}
@staticmethod
def _serialize_value(value: Any) -> Any:
from datetime import datetime
from decimal import Decimal
from enum import Enum
if value is None:
return None
if isinstance(value, Decimal):
return float(value)
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, Enum):
return value.value
return value
@staticmethod
def _compute_changed_fields(
old: dict[str, Any],
new: dict[str, Any],
) -> dict[str, Any]:
changed: dict[str, Any] = {}
for key, new_val in new.items():
old_val = old.get(key)
serialized_new = PropertyPipeline._serialize_value(new_val)
if old_val != serialized_new:
changed[key] = {"old": old_val, "new": serialized_new}
return changed
@staticmethod
def _infer_field_type(value: Any) -> str:
if isinstance(value, bool):
return "bool"
if isinstance(value, int):
return "int"
if isinstance(value, float):
return "float"
return "str"