Newer
Older
vmk-360-data_collector / .claude / plans / plan_1-7.md

План реализации: пункты 1–7 из REVIEW_FOLLOWUP

1. Pipeline декомпозиция

Проблема: PropertyPipeline.process() — один монолитный метод (~160 строк), в котором смешаны AI-нормализация, БД-операции, скачивание изображений, анализ фото, обогащение. При ошибке на поздней стадии (например, enrichment) остаётся "грязное" состояние в БД.

Решение: Разбить process() на явные стадии (stage). Каждая стадия — отдельный приватный метод с чётким контрактом (входные данные → выходные данные). Вводим PipelineContext (dataclass), который передаётся между стадиями и аккумулирует промежуточные результаты.

Стадии:

  1. _stage_load_raw(raw_data_id) → загрузка RawParsingData
  2. _stage_normalize(raw, context) → AI-нормализация, проверка is_real_estate
  3. _stage_resolve_entities(raw, normalized, context)data_source, property_type, поиск existing listing'а
  4. _stage_persist_listing(listing_kwargs, existing, context)create или update + snapshot
  5. _stage_persist_custom_fields(property_id, custom_fields, context)
  6. _stage_process_images(property_id, images, context) → download + analyze + dedup
  7. _stage_enrich(property_id, normalized, aggregated_analysis, context)
  8. _stage_finalize(raw_data_id, property_id, snapshot_id, context)set_processed

Rollback: На уровне process() при исключении на любой стадии > 3 вызываем _rollback(property_id), который удаляет созданный listing, images, custom_fields, enrichment. Но поскольку commit происходит в router, rollback — это просто не делать await db.commit(). Тем не менее, объекты уже flush-нуты в сессию, поэтому нужно либо:

  • а) Использовать вложенную транзакцию (SQLAlchemy begin_nested()), либо
  • б) Оставить декомпозицию без полного rollback на этом этапе, но с чётким статусом failed в raw_parsing_data.

Выбранный подход: Пока без nested transactions (это отдельный пункт), просто декомпозиция + чёткое обновление status/error_message в raw_data при падении любой стадии. Стадии оборачиваются в try/except внутри process(), где каждая стадия возвращает StageResult (success/failure + data).

Файлы:

  • src/vmk_data_collector/services/property_pipeline.py — полный рефакторинг

2. Retry / Circuit Breaker

Проблема: OllamaClient.chat() и ImageDownloader.download() при сетевых сбоях падают сразу. Нет повторных попыток.

Решение: Использовать tenacity (уже в pyproject.toml).

Для OllamaClient:

  • Декоратор @retry(...) на chat() и chat_with_images():
    • stop=stop_after_attempt(3)
    • wait=wait_exponential(min=1, max=10)
    • retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException, httpx.HTTPStatusError)) — но HTTPStatusError только при 5xx или 429. Для 4xx — не retry.
    • before_sleep=log_before_sleep — логировать retry

Для ImageDownloader.download():

  • Аналогичный retry на сетевые ошибки (httpx внутри download).

Circuit Breaker:

  • Простая реализация на уровне OllamaClient (не тянем pybreaker):
    • Класс CircuitBreaker с тремя состояниями: CLOSED, OPEN, HALF_OPEN.
    • Порог: 5 ошибок подряд → OPEN на 30 сек.
    • В OPEN — CircuitBreakerOpenError без запроса.
    • Прикрепляем к OllamaClient.__init__().

Файлы:

  • src/vmk_data_collector/services/ollama_client.py — retry + circuit breaker
  • src/vmk_data_collector/services/image_downloader.py — retry
  • src/vmk_data_collector/core/circuit_breaker.py — новый файл (или inline в ollama_client)

3. Image resize для vision

Проблема: В ai_image_analyzer.py фото отправляется в llava в оригинальном разрешении. Может быть 10+ МБ на фото, модель всё равно ресайзит внутри.

Решение: Перед image_to_base64 уменьшить изображение до max 1024px по длинной стороне (или настраиваемый лимит). Конвертируем в JPEG качества 85 для предсказуемого размера.

Реализация:

  • Новый метод OllamaClient.resize_image(image_path: str, max_size: int = 1024, quality: int = 85) -> bytes.
  • Использует PIL.Image.open(), .thumbnail((max_size, max_size)), сохраняет во временный BytesIO.
  • image_to_base64 заменяем на image_to_base64(image_path: str, resize: bool = True) -> str.
  • В AiImageAnalyzer.analyze() вызываем с resize=True.

Файлы:

  • src/vmk_data_collector/services/ollama_client.py — resize метод
  • src/vmk_data_collector/services/ai_image_analyzer.py — вызов с resize=True

4. /health endpoint

Проблема: Нет способа проверить живость сервиса перед деплоем.

Решение: Новый router health.py.

Endpoint: GET /api/v1/health

Проверки:

  • db: await engine.execute(text("SELECT 1")) — используем from vmk_data_collector.db.engine import engine
  • ollama: GET {base_url}/api/tags через короткий httpx.AsyncClient(timeout=5)
  • disk: shutil.disk_usage(settings.image_storage_path_abs) — проверяем free > 100MB

Ответ:

{
  "status": "healthy" | "degraded",
  "checks": {
    "database": {"status": "pass" | "fail", "response_ms": 12},
    "ollama": {"status": "pass" | "fail", "response_ms": 45},
    "disk": {"status": "pass" | "fail", "free_bytes": 1234567890}
  }
}

HTTP код: 200 если healthy, 503 если degraded.

Файлы:

  • src/vmk_data_collector/api/v1/router_health.py — новый router
  • src/vmk_data_collector/main.py — подключить router

5. DB индексы

Проблема: Нет индексов на полях, которые часто используются в WHERE/JOIN.

Решение: Добавить sqlalchemy.Index в модели и создать Alembic-миграцию.

Индексы:

  • raw_parsing_data: source_id, external_id, status
  • property_listings: source_id, external_id, city, price, deal_type, property_type_id, listing_status
  • property_images: property_id, hash
  • ai_enrichment: property_id
  • property_snapshots: property_id
  • property_custom_fields: property_id

Реализация:

  • В каждой модели добавляем __table_args__ = (Index(...), existing_unique_constraint).
  • Генерируем миграцию: alembic revision --autogenerate -m "add_performance_indexes".
  • Проверяем сгенерированный скрипт и правим при необходимости.

Файлы:

  • src/vmk_data_collector/models/*.py — добавить Index в __table_args__
  • alembic/versions/xxx_add_performance_indexes.py — новая миграция

6. AI error classification

Проблема: Все ошибки от Ollama ловятся как Exception без различия retryable/fatal. AiNormalizer, AiImageAnalyzer, AiEnricher возвращают дефолтные пустые ответы при любой ошибке, скрывая проблему.

Решение: Ввести иерархию исключений в core/exceptions.py и классифицировать ошибки в OllamaClient.

Исключения:

  • OllamaRetryableError(AIProcessingError) — сеть, таймаут, 5xx, 429
  • OllamaFatalError(AIProcessingError) — 4xx (кроме 429), невалидный JSON, неверная модель

Классификация в OllamaClient:

  • httpx.ConnectError, httpx.TimeoutExceptionOllamaRetryableError
  • httpx.HTTPStatusError:
    • status >= 500 или == 429 → OllamaRetryableError
    • status 400..499 (кроме 429) → OllamaFatalError
  • json.JSONDecodeErrorOllamaFatalError

В AI-сервисах:

  • AiNormalizer.normalize() — при OllamaRetryableError прокидываем наверх (tenacity сделает retry). При OllamaFatalError — возвращаем fallback с логом.
  • AiImageAnalyzer.analyze() — аналогично.
  • AiEnricher.enrich() — аналогично.

Файлы:

  • src/vmk_data_collector/core/exceptions.py — новые классы
  • src/vmk_data_collector/services/ollama_client.py — классификация ошибок
  • src/vmk_data_collector/services/ai_normalizer.py — перехват retryable/fatal
  • src/vmk_data_collector/services/ai_image_analyzer.py — перехват retryable/fatal
  • src/vmk_data_collector/services/ai_enricher.py — перехват retryable/fatal

7. Strict payload schema

Проблема: RawDataIngestRequest.payloaddict[str, Any]. Нет валидации входных данных перед отправкой в AI. Можно отправить пустой payload или невалидные типы.

Решение: Определить PayloadSchema (Pydantic model) с ожидаемыми полями и валидировать payload в endpoint'е.

Поля payload:

  • title: str | None — заголовок объявления
  • description: str | None — описание
  • price: str | float | int | None — цена (примем как string/number)
  • url: str | None — ссылка на источник
  • images: list[str] | None — список URL фото
  • contact_phone: str | None
  • address: str | None
  • area: str | float | int | None
  • rooms: str | int | None
  • floor: str | int | None
  • extra: dict[str, Any] | None — всё остальное (пробрасывается как custom fields)

Валидация:

  • В router_properties.py после получения request валидируем PayloadSchema(**request.payload).
  • Если title и description оба NoneValidationError("Payload must contain at least title or description").
  • Если images содержит не-строки — ValidationError("All images must be URLs (strings)").

Файлы:

  • src/vmk_data_collector/schemas/raw_data.py — добавить PayloadSchema
  • src/vmk_data_collector/api/v1/router_properties.py — валидация payload

Порядок реализации (рекомендуемый)

  1. DB индексы (#5) — чистый SQL-файл, zero-risk, можно сделать первым.
  2. Strict payload schema (#7) — защита данных на входе.
  3. AI error classification (#6) — нужно для корректного retry.
  4. Retry / Circuit Breaker (#2) — использует classification из п.3.
  5. Image resize (#3) — использует ollama_client.
  6. /health endpoint (#4) — простой router.
  7. Pipeline декомпозиция (#1) — самый объёмный, зависит от всех предыдущих.

Архитектурные решения

  • Circuit Breaker: Простой in-memory (без Redis), т.к. сервис пока single-instance. Если будут реплики — перейдём на общее хранилище.
  • Rollback в pipeline: Без nested transactions на этом этапе. Декомпозиция для читаемости и observability, rollback — через status=failed в raw_data + логи.
  • Image resize: JPEG 85/1024px — баланс между качеством и размером для vision model.
  • Retry policy: 3 попытки, exponential backoff 1→2→4 сек. Для ingest endpoint'а 3×4=12 сек максимум + timeout Ollama 120 сек = приемлемо.