Newer
Older
vmk-360-data_collector / docs / PARSER_INTEGRATION.md

Интеграция парсеров с VMK Data Collector

Целевая аудитория: разработчики парсеров, которые отправляют данные в сервис.

Содержание


Общая схема

┌─────────────┐     скачивает фото      ┌──────────────────────┐
│   Источник  │ ───────────────────────▶│      Парсер          │
│  (dom.ria,  │                         │  (selenium/scrapy/…) │
│   avito, …) │◀── raw HTML + metadata  └──────────┬───────────┘
└─────────────┘                                    │
                                                   │ POST /api/v1/ingest/with-images
                                                   ▼
                                          ┌──────────────────────┐
                                          │  VMK Data Collector  │
                                          │  FastAPI + AI        │
                                          │  pipeline inline     │
                                          └──────────────────────┘

Парсер отвечает за:

  1. Сбор текста, цен, адресов, характеристик со страницы источника.
  2. Скачивание фотографий с CDN источника (DOM.Ria, Avito и т.д.).
  3. Отправку всего этого в Data Collector.

Сервис отвечает за:

  • AI-валидацию (это вообще недвижимость?)
  • Нормализацию полей
  • Анализ изображений (vision)
  • Обогащение текста (NER, summary, оценка цены)
  • Хранение в PostgreSQL

Выбор endpoint'а

Endpoint Когда использовать Передаётся
POST /api/v1/ingest Парсер не может или не хочет качать фото metadata JSON + список image_urls внутри payload
POST /api/v1/ingest/with-images Парсер уже скачал фото с источника metadata JSON + бинарные файлы images

Почему /with-images?

Некоторые CDN (например, cdn.riastatic.com) блокируют автоматические запросы — возвращают 415 Unsupported Media Type независимо от заголовков. Чтобы не подстраиваться под каждый источник, парсер сам качает фото через браузер (selenium/playwright) и передаёт нам бинарные файлы. Сервис не пытается обходить чужие CDN.


Endpoint: /ingest

POST /api/v1/ingest
Content-Type: application/json

Тело

{
  "source_slug": "domria",
  "external_id": "34329468",
  "payload": {
    "title": "2-к комнатная квартира",
    "description": "Продается квартира в центре",
    "price": 125000,
    "currency": "USD",
    "url": "https://dom.ria.com/…",
    "images": [
      "https://cdn.riastatic.com/…/photo1.jpg",
      "https://cdn.riastatic.com/…/photo2.jpg"
    ],
    "address": "Киев, Оболонь",
    "area_total": 65,
    "rooms": 2,
    "floor": 5,
    "floors_total": 9
  }
}

Ответ

{
  "job_id": 42,
  "property_id": null,
  "status": "pending",
  "reason": null,
  "message": "Queued for processing",
  "snapshot_id": null
}

Pipeline запускается фоново через QueueWorker. Парсер получает job_id и может забыть про задачу.


Endpoint: /ingest/with-images

POST /api/v1/ingest/with-images
Content-Type: multipart/form-data

Тело (multipart)

Поле Тип Описание
metadata string (JSON) Тот же формат, что и тело /ingest
images binary Фото. Поле повторяется для каждого файла.

Лимиты

  • Макс. файлов за запрос: 50
  • Макс. размер одного файла: 10 МБ

Pipeline inline

В отличие от /ingest, pipeline отрабатывает синхронно в рамках HTTP-запроса — потому что изображения уже локальные и не нужно ждать очереди.

Ответ

{
  "job_id": 42,
  "property_id": 7,
  "status": "completed",
  "reason": null,
  "message": "Property ingested successfully",
  "snapshot_id": null
}
  • property_id — ID сохранённого объявления в property_listings
  • snapshot_id — ID снапшота (если объявление уже существовало и обновилось)

Примеры (curl)

curl -X POST http://localhost:8020/api/v1/ingest/with-images \
  -F 'metadata={"source_slug": "domria", "external_id": "34329468", "payload": {"title": "2-комнатная квартира", "description": "Продается квартира", "price": 125000}}' \
  -F 'images=@/path/to/photo1.jpg' \
  -F 'images=@/path/to/photo2.jpg'

Примеры (Python requests)

import json
import requests

metadata = {
    "source_slug": "domria",
    "external_id": "34329468",
    "payload": {
        "title": "2-комнатная квартира",
        "description": "Продается квартира",
        "price": 125000,
        "url": "https://dom.ria.com/…",
        "address": "Киев",
        "area_total": 65,
        "rooms": 2,
    },
}

files = [
    ("images", open("photo1.jpg", "rb")),
    ("images", open("photo2.jpg", "rb")),
]

resp = requests.post(
    "http://localhost:8020/api/v1/ingest/with-images",
    data={"metadata": json.dumps(metadata)},
    files=files,
)
print(resp.json())

Обработка ответов

Успех: status == "completed"

{
  "job_id": 42,
  "property_id": 7,
  "status": "completed",
  "message": "Property ingested successfully"
}

Объявление сохранено. Фото перемещены из temp в постоянное хранилище: /var/lib/vmk/images/{property_id}/<hash>.jpg.

AI отбросил объявление: status == "invalid"

{
  "job_id": 42,
  "property_id": null,
  "status": "invalid",
  "reason": "Текст не содержит информации о недвижимости.",
  "message": "Payload is not real estate"
}

Это не ошибка парсера. AI посчитал, что текст не про недвижимость. Можно логировать reason и идти дальше.

Ошибка сервера

Если вернулся HTTP 500 или 422 — логировать и retry (см. ниже).


Повторная отправка

Если объявление с тем же source_slug + external_id уже есть в БД, сервис:

  1. Создаёт снапшот старой версии (сохраняет старые поля + diff).
  2. Обновляет текущую запись property_listings.
  3. Перезапускает AI-обогащение.

Парсеру ничего специально делать не нужно — просто отправлять данные заново.


Ошибки и retry

HTTP Значение Retry?
202 Принято в обработку Нет
422 Ошибка валидации payload Нет (исправить payload)
429 Rate limit (60/30 в минуту) Да, через 10-60 сек
500 Внутренняя ошибка (Ollama упал и т.д.) Да, через 30 сек
502/503 Сервис недоступен Да, через экспоненциальный backoff

Простой retry-цикл на Python

import time
import requests

BASE_URL = "http://localhost:8020/api/v1"

def ingest_with_retry(metadata, files, max_retries=3):
    for attempt in range(max_retries):
        try:
            resp = requests.post(
                f"{BASE_URL}/ingest/with-images",
                data={"metadata": json.dumps(metadata)},
                files=files,
                timeout=120,  # pipeline inline может занять 20-60 сек
            )
            if resp.status_code == 429:
                time.sleep(10 * (attempt + 1))
                continue
            if resp.status_code >= 500:
                time.sleep(30 * (attempt + 1))
                continue
            resp.raise_for_status()
            return resp.json()
        except requests.exceptions.RequestException as exc:
            if attempt == max_retries - 1:
                raise
            time.sleep(30 * (attempt + 1))
    return None

Пример интеграции на Python

Полный класс-обёртка для парсера:

import json
import requests
from pathlib import Path


class VmkCollectorClient:
    """Клиент для отправки данных от парсера в VMK Data Collector."""

    def __init__(self, base_url: str = "http://localhost:8020/api/v1"):
        self.base_url = base_url.rstrip("/")

    def _post(self, path: str, **kwargs):
        url = f"{self.base_url}{path}"
        resp = requests.post(url, **kwargs)
        resp.raise_for_status()
        return resp.json()

    def ingest(
        self,
        source_slug: str,
        external_id: str,
        payload: dict,
    ) -> dict:
        """Отправить объявление без бинарных фото (только URLs в payload)."""
        return self._post(
            "/ingest",
            json={
                "source_slug": source_slug,
                "external_id": external_id,
                "payload": payload,
            },
        )

    def ingest_with_images(
        self,
        source_slug: str,
        external_id: str,
        payload: dict,
        image_paths: list[str | Path],
    ) -> dict:
        """Отправить объявление с бинарными фото."""
        metadata = json.dumps({
            "source_slug": source_slug,
            "external_id": external_id,
            "payload": payload,
        })
        files = []
        for p in image_paths:
            p = Path(p)
            files.append(("images", (p.name, p.open("rb"), "image/jpeg")))

        return self._post(
            "/ingest/with-images",
            data={"metadata": metadata},
            files=files,
        )


# --- Использование в парсере ---

client = VmkCollectorClient()

# Вариант 1: быстро, если фото не нужны прямо сейчас
client.ingest(
    source_slug="domria",
    external_id="34329468",
    payload={
        "title": "2-к квартира",
        "price": 125000,
        "images": ["https://cdn.riastatic.com/…/photo1.jpg"],
    },
)

# Вариант 2: скачали фото через selenium/playwright
client.ingest_with_images(
    source_slug="domria",
    external_id="34329468",
    payload={
        "title": "2-к квартира",
        "price": 125000,
        "url": "https://dom.ria.com/…",
    },
    image_paths=["/tmp/domria_34329468_1.jpg", "/tmp/domria_34329468_2.jpg"],
)

FAQ

Q: Фото сохраняются на диске сервера? A: Да. Бинарные файлы сохраняются в /var/lib/vmk/images/{property_id}/<hash>.{ext}. Temp-директория очищается сразу после pipeline.

Q: Что если отправить два одинаковых фото? A: Сервис проверяет hash (SHA256) и пропускает дубликаты. images_count будет меньше, чем отправлено.

Q: Какие поля payload обязательны? A: Хотя бы одно из: title или description. Всё остальное — опционально, но чем больше полей, тем точнее AI-нормализация.

Q: Как часто можно слать? A: /ingest — 60/min, /ingest/with-images — 30/min. При превышении вернётся HTTP 429.

Q: Можно ли отправить объявление без фото? A: Да. Поле images в payload необязательное.