Проблема: PropertyPipeline.process() — один монолитный метод (~160 строк), в котором смешаны AI-нормализация, БД-операции, скачивание изображений, анализ фото, обогащение. При ошибке на поздней стадии (например, enrichment) остаётся "грязное" состояние в БД.
Решение: Разбить process() на явные стадии (stage). Каждая стадия — отдельный приватный метод с чётким контрактом (входные данные → выходные данные). Вводим PipelineContext (dataclass), который передаётся между стадиями и аккумулирует промежуточные результаты.
Стадии:
_stage_load_raw(raw_data_id) → загрузка RawParsingData_stage_normalize(raw, context) → AI-нормализация, проверка is_real_estate_stage_resolve_entities(raw, normalized, context) → data_source, property_type, поиск existing listing'а_stage_persist_listing(listing_kwargs, existing, context) → create или update + snapshot_stage_persist_custom_fields(property_id, custom_fields, context)_stage_process_images(property_id, images, context) → download + analyze + dedup_stage_enrich(property_id, normalized, aggregated_analysis, context)_stage_finalize(raw_data_id, property_id, snapshot_id, context) → set_processedRollback: На уровне process() при исключении на любой стадии > 3 вызываем _rollback(property_id), который удаляет созданный listing, images, custom_fields, enrichment. Но поскольку commit происходит в router, rollback — это просто не делать await db.commit(). Тем не менее, объекты уже flush-нуты в сессию, поэтому нужно либо:
begin_nested()), либоfailed в raw_parsing_data.Выбранный подход: Пока без nested transactions (это отдельный пункт), просто декомпозиция + чёткое обновление status/error_message в raw_data при падении любой стадии. Стадии оборачиваются в try/except внутри process(), где каждая стадия возвращает StageResult (success/failure + data).
Файлы:
src/vmk_data_collector/services/property_pipeline.py — полный рефакторингПроблема: OllamaClient.chat() и ImageDownloader.download() при сетевых сбоях падают сразу. Нет повторных попыток.
Решение: Использовать tenacity (уже в pyproject.toml).
Для OllamaClient:
@retry(...) на chat() и chat_with_images():
stop=stop_after_attempt(3)wait=wait_exponential(min=1, max=10)retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError)) — но HTTPStatusError только при 5xx или 429. Для 4xx — не retry.before_sleep=log_before_sleep — логировать retryДля ImageDownloader.download():
httpx внутри download).Circuit Breaker:
OllamaClient (не тянем pybreaker):
CircuitBreaker с тремя состояниями: CLOSED, OPEN, HALF_OPEN.CircuitBreakerOpenError без запроса.OllamaClient.__init__().Файлы:
src/vmk_data_collector/services/ollama_client.py — retry + circuit breakersrc/vmk_data_collector/services/image_downloader.py — retrysrc/vmk_data_collector/core/circuit_breaker.py — новый файл (или inline в ollama_client)Проблема: В ai_image_analyzer.py фото отправляется в llava в оригинальном разрешении. Может быть 10+ МБ на фото, модель всё равно ресайзит внутри.
Решение: Перед image_to_base64 уменьшить изображение до max 1024px по длинной стороне (или настраиваемый лимит). Конвертируем в JPEG качества 85 для предсказуемого размера.
Реализация:
OllamaClient.resize_image(image_path: str, max_size: int = 1024, quality: int = 85) -> bytes.PIL.Image.open(), .thumbnail((max_size, max_size)), сохраняет во временный BytesIO.image_to_base64 заменяем на image_to_base64(image_path: str, resize: bool = True) -> str.AiImageAnalyzer.analyze() вызываем с resize=True.Файлы:
src/vmk_data_collector/services/ollama_client.py — resize методsrc/vmk_data_collector/services/ai_image_analyzer.py — вызов с resize=True/health endpointПроблема: Нет способа проверить живость сервиса перед деплоем.
Решение: Новый router health.py.
Endpoint: GET /api/v1/health
Проверки:
await engine.execute(text("SELECT 1")) — используем from vmk_data_collector.db.engine import engineGET {base_url}/api/tags через короткий httpx.AsyncClient(timeout=5)shutil.disk_usage(settings.image_storage_path_abs) — проверяем free > 100MBОтвет:
{
"status": "healthy" | "degraded",
"checks": {
"database": {"status": "pass" | "fail", "response_ms": 12},
"ollama": {"status": "pass" | "fail", "response_ms": 45},
"disk": {"status": "pass" | "fail", "free_bytes": 1234567890}
}
}
HTTP код: 200 если healthy, 503 если degraded.
Файлы:
src/vmk_data_collector/api/v1/router_health.py — новый routersrc/vmk_data_collector/main.py — подключить routerПроблема: Нет индексов на полях, которые часто используются в WHERE/JOIN.
Решение: Добавить sqlalchemy.Index в модели и создать Alembic-миграцию.
Индексы:
raw_parsing_data: source_id, external_id, statusproperty_listings: source_id, external_id, city, price, deal_type, property_type_id, listing_statusproperty_images: property_id, hashai_enrichment: property_idproperty_snapshots: property_idproperty_custom_fields: property_idРеализация:
__table_args__ = (Index(...), existing_unique_constraint).alembic revision --autogenerate -m "add_performance_indexes".Файлы:
src/vmk_data_collector/models/*.py — добавить Index в __table_args__alembic/versions/xxx_add_performance_indexes.py — новая миграцияПроблема: Все ошибки от Ollama ловятся как Exception без различия retryable/fatal. AiNormalizer, AiImageAnalyzer, AiEnricher возвращают дефолтные пустые ответы при любой ошибке, скрывая проблему.
Решение: Ввести иерархию исключений в core/exceptions.py и классифицировать ошибки в OllamaClient.
Исключения:
OllamaRetryableError(AIProcessingError) — сеть, таймаут, 5xx, 429OllamaFatalError(AIProcessingError) — 4xx (кроме 429), невалидный JSON, неверная модельКлассификация в OllamaClient:
httpx.ConnectError, httpx.TimeoutException → OllamaRetryableErrorhttpx.HTTPStatusError:
OllamaRetryableErrorOllamaFatalErrorjson.JSONDecodeError → OllamaFatalErrorВ AI-сервисах:
AiNormalizer.normalize() — при OllamaRetryableError прокидываем наверх (tenacity сделает retry). При OllamaFatalError — возвращаем fallback с логом.AiImageAnalyzer.analyze() — аналогично.AiEnricher.enrich() — аналогично.Файлы:
src/vmk_data_collector/core/exceptions.py — новые классыsrc/vmk_data_collector/services/ollama_client.py — классификация ошибокsrc/vmk_data_collector/services/ai_normalizer.py — перехват retryable/fatalsrc/vmk_data_collector/services/ai_image_analyzer.py — перехват retryable/fatalsrc/vmk_data_collector/services/ai_enricher.py — перехват retryable/fatalПроблема: RawDataIngestRequest.payload — dict[str, Any]. Нет валидации входных данных перед отправкой в AI. Можно отправить пустой payload или невалидные типы.
Решение: Определить PayloadSchema (Pydantic model) с ожидаемыми полями и валидировать payload в endpoint'е.
Поля payload:
title: str | None — заголовок объявленияdescription: str | None — описаниеprice: str | float | int | None — цена (примем как string/number)url: str | None — ссылка на источникimages: list[str] | None — список URL фотоcontact_phone: str | Noneaddress: str | Nonearea: str | float | int | Nonerooms: str | int | Nonefloor: str | int | Noneextra: dict[str, Any] | None — всё остальное (пробрасывается как custom fields)Валидация:
router_properties.py после получения request валидируем PayloadSchema(**request.payload).title и description оба None — ValidationError("Payload must contain at least title or description").images содержит не-строки — ValidationError("All images must be URLs (strings)").Файлы:
src/vmk_data_collector/schemas/raw_data.py — добавить PayloadSchemasrc/vmk_data_collector/api/v1/router_properties.py — валидация payload/health endpoint (#4) — простой router.status=failed в raw_data + логи.