diff --git a/src/vmk_data_collector/services/property_pipeline.py b/src/vmk_data_collector/services/property_pipeline.py index 9ffc7e3..d683128 100644 --- a/src/vmk_data_collector/services/property_pipeline.py +++ b/src/vmk_data_collector/services/property_pipeline.py @@ -212,17 +212,7 @@ raw_id=raw_data_id, error=str(exc), ) - await self._raw_repo.update_status( - raw_data_id, - RawDataStatus.failed, - error_message=f"Circuit breaker open: {exc}", - ) - return IngestResponse( - job_id=raw_data_id, - status="failed", - reason="circuit_breaker_open", - message=f"Ollama circuit breaker is open: {exc}", - ) + raise except Exception as exc: pipeline_results_total.labels(status="failed").inc() logger.error( @@ -231,17 +221,7 @@ error=str(exc), exc_info=True, ) - await self._raw_repo.update_status( - raw_data_id, - RawDataStatus.failed, - error_message=str(exc), - ) - return IngestResponse( - job_id=raw_data_id, - status="failed", - reason="pipeline_error", - message=f"Pipeline error: {exc}", - ) + raise finally: pipeline_duration_seconds.observe(time.perf_counter() - start) if self._active_jobs is not None: @@ -502,15 +482,199 @@ # Helpers # ------------------------------------------------------------------ + _BOOLEAN_FIELDS = frozenset( + { + "has_balcony", + "has_loggia", + "has_freight_elevator", + "internet", + "security", + "is_agent", + } + ) + + _DATETIME_FIELDS = frozenset({"publish_date", "archived_at"}) + + @staticmethod + def _to_bool(value: Any) -> bool | None: + if value is None: + return None + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in ("true", "1", "yes", "да", "+"): + return True + if lowered in ("false", "0", "no", "нет", "-", ""): + return False + # Non-empty arbitrary string → True (e.g. "подземные паркинги, охрана") + return bool(lowered) + return bool(value) + + @staticmethod + def _to_datetime(value: Any) -> Any: + from datetime import datetime + + if value is None or isinstance(value, datetime): + return value + if isinstance(value, str): + value = value.strip() + if not value: + return None + for fmt in ( + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%d", + "%d.%m.%Y %H:%M", + "%d.%m.%Y", + ): + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + return value + + _ENUM_MAPPINGS: dict[str, dict[str, str]] = { + "building_type": { + "кирпич": "brick", + "кирпичный": "brick", + "панель": "panel", + "панельный": "panel", + "монолит": "monolith", + "монолитный": "monolith", + "монолитно-каркасный": "monolith", + "монолитно-каркас": "monolith", + "газобетон": "gas_block", + "газоблок": "gas_block", + "дерево": "wood", + "деревянный": "wood", + "новый дом": "monolith", + "сталинка": "brick", + "хрущевка": "panel", + "брежневка": "panel", + }, + "renovation_status": { + "косметический": "cosmetic", + "косметика": "cosmetic", + "евро": "euro", + "евроремонт": "euro", + "дизайнерский": "designer", + "дизайн": "designer", + "без ремонта": "none", + "нет": "none", + "строительная отделка": "construction", + "нуждается в обновлении": "construction", + "предчистовая": "construction", + }, + "deal_type": { + "продажа": "sale", + "аренда": "rent_long", + "долгосрочная аренда": "rent_long", + "посуточно": "rent_short", + "краткосрочная аренда": "rent_short", + }, + "layout": { + "студия": "studio", + "раздельная": "separate", + "смежная": "adjacent", + "смежно-раздельная": "adjacent", + }, + "bathroom_type": { + "совмещенный": "combined", + "совмещенная": "combined", + "раздельный": "separate", + "раздельная": "separate", + "несколько": "multiple", + }, + "parking_type": { + "наземная": "ground", + "наземный": "ground", + "подземная": "underground", + "подземный": "underground", + "подземные паркинги": "underground", + "нет": "none", + "отсутствует": "none", + "гараж": "garage", + }, + "heating_type": { + "центральное": "central", + "централизованное": "central", + "автономное": "autonomous", + "автономный": "autonomous", + "электрическое": "floor", + "тёплые полы": "floor", + "нет": "none", + "отсутствует": "none", + "индивидуальное газовое": "autonomous", + "газовое": "autonomous", + }, + "window_view": { + "во двор": "yard", + "двор": "yard", + "на улицу": "street", + "улица": "street", + "на парк": "park", + "парк": "park", + "на воду": "water", + "вода": "water", + "на лес": "forest", + "лес": "forest", + }, + "metro_distance_type": { + "пешком": "walk", + "пешком": "walk", + "транспортом": "transport", + }, + "listing_status": { + "активное": "active", + "active": "active", + "продано": "sold", + "sold": "sold", + "сдано": "rented", + "rented": "rented", + "удалено": "removed", + "removed": "removed", + "в архиве": "archived", + "archived": "archived", + }, + } + + @staticmethod + def _to_enum(field: str, value: Any) -> Any: + if value is None or not isinstance(value, str): + return value + mapping = PropertyPipeline._ENUM_MAPPINGS.get(field, {}) + if not mapping: + return value + lowered = value.strip().lower() + # Exact match + if lowered in mapping: + return mapping[lowered] + # Substring match (e.g. "подземные паркинги, охрана" → underground) + for ru, en in mapping.items(): + if ru in lowered: + return en + # Unknown value → null (safer than crashing) + return None + @staticmethod def _normalized_to_kwargs( normalized: NormalizedProperty, ) -> dict[str, Any]: - return { - k: v - for k, v in normalized.__dict__.items() - if k not in ("images", "custom_fields") - } + result: dict[str, Any] = {} + for k, v in normalized.__dict__.items(): + if k in ("images", "custom_fields"): + continue + if k in PropertyPipeline._BOOLEAN_FIELDS: + v = PropertyPipeline._to_bool(v) + elif k in PropertyPipeline._DATETIME_FIELDS: + v = PropertyPipeline._to_datetime(v) + elif k in PropertyPipeline._ENUM_MAPPINGS: + v = PropertyPipeline._to_enum(k, v) + result[k] = v + return result @staticmethod def _listing_to_dict(listing: Any) -> dict[str, Any]: