Целевая аудитория: разработчики парсеров, которые отправляют данные в сервис.
/ingest (только URLs)/ingest/with-images (бинарные файлы)┌─────────────┐ скачивает фото ┌──────────────────────┐
│ Источник │ ───────────────────────▶│ Парсер │
│ (dom.ria, │ │ (selenium/scrapy/…) │
│ avito, …) │◀── raw HTML + metadata └──────────┬───────────┘
└─────────────┘ │
│ POST /api/v1/ingest/with-images
▼
┌──────────────────────┐
│ VMK Data Collector │
│ FastAPI + AI │
│ pipeline inline │
└──────────────────────┘
Парсер отвечает за:
Сервис отвечает за:
| Endpoint | Когда использовать | Передаётся |
|---|---|---|
POST /api/v1/ingest |
Парсер не может или не хочет качать фото | metadata JSON + список image_urls внутри payload |
POST /api/v1/ingest/with-images |
Парсер уже скачал фото с источника | metadata JSON + бинарные файлы images |
/with-images?Некоторые CDN (например, cdn.riastatic.com) блокируют автоматические запросы — возвращают 415 Unsupported Media Type независимо от заголовков. Чтобы не подстраиваться под каждый источник, парсер сам качает фото через браузер (selenium/playwright) и передаёт нам бинарные файлы. Сервис не пытается обходить чужие CDN.
/ingestPOST /api/v1/ingest Content-Type: application/json
{
"source_slug": "domria",
"external_id": "34329468",
"payload": {
"title": "2-к комнатная квартира",
"description": "Продается квартира в центре",
"price": 125000,
"currency": "USD",
"url": "https://dom.ria.com/…",
"images": [
"https://cdn.riastatic.com/…/photo1.jpg",
"https://cdn.riastatic.com/…/photo2.jpg"
],
"address": "Киев, Оболонь",
"area_total": 65,
"rooms": 2,
"floor": 5,
"floors_total": 9
}
}
{
"job_id": 42,
"property_id": null,
"status": "pending",
"reason": null,
"message": "Queued for processing",
"snapshot_id": null
}
Pipeline запускается фоново через QueueWorker. Парсер получает job_id и может забыть про задачу.
/ingest/with-imagesPOST /api/v1/ingest/with-images Content-Type: multipart/form-data
| Поле | Тип | Описание |
|---|---|---|
metadata |
string (JSON) |
Тот же формат, что и тело /ingest |
images |
binary |
Фото. Поле повторяется для каждого файла. |
В отличие от /ingest, pipeline отрабатывает синхронно в рамках HTTP-запроса — потому что изображения уже локальные и не нужно ждать очереди.
{
"job_id": 42,
"property_id": 7,
"status": "completed",
"reason": null,
"message": "Property ingested successfully",
"snapshot_id": null
}
property_id — ID сохранённого объявления в property_listingssnapshot_id — ID снапшота (если объявление уже существовало и обновилось)curl -X POST http://localhost:8020/api/v1/ingest/with-images \
-F 'metadata={"source_slug": "domria", "external_id": "34329468", "payload": {"title": "2-комнатная квартира", "description": "Продается квартира", "price": 125000}}' \
-F 'images=@/path/to/photo1.jpg' \
-F 'images=@/path/to/photo2.jpg'
import json
import requests
metadata = {
"source_slug": "domria",
"external_id": "34329468",
"payload": {
"title": "2-комнатная квартира",
"description": "Продается квартира",
"price": 125000,
"url": "https://dom.ria.com/…",
"address": "Киев",
"area_total": 65,
"rooms": 2,
},
}
files = [
("images", open("photo1.jpg", "rb")),
("images", open("photo2.jpg", "rb")),
]
resp = requests.post(
"http://localhost:8020/api/v1/ingest/with-images",
data={"metadata": json.dumps(metadata)},
files=files,
)
print(resp.json())
status == "completed"{
"job_id": 42,
"property_id": 7,
"status": "completed",
"message": "Property ingested successfully"
}
Объявление сохранено. Фото перемещены из temp в постоянное хранилище: /var/lib/vmk/images/{property_id}/<hash>.jpg.
status == "invalid"{
"job_id": 42,
"property_id": null,
"status": "invalid",
"reason": "Текст не содержит информации о недвижимости.",
"message": "Payload is not real estate"
}
Это не ошибка парсера. AI посчитал, что текст не про недвижимость. Можно логировать reason и идти дальше.
Если вернулся HTTP 500 или 422 — логировать и retry (см. ниже).
Если объявление с тем же source_slug + external_id уже есть в БД, сервис:
property_listings.Парсеру ничего специально делать не нужно — просто отправлять данные заново.
| HTTP | Значение | Retry? |
|---|---|---|
202 |
Принято в обработку | Нет |
422 |
Ошибка валидации payload | Нет (исправить payload) |
429 |
Rate limit (60/30 в минуту) | Да, через 10-60 сек |
500 |
Внутренняя ошибка (Ollama упал и т.д.) | Да, через 30 сек |
502/503 |
Сервис недоступен | Да, через экспоненциальный backoff |
import time
import requests
BASE_URL = "http://localhost:8020/api/v1"
def ingest_with_retry(metadata, files, max_retries=3):
for attempt in range(max_retries):
try:
resp = requests.post(
f"{BASE_URL}/ingest/with-images",
data={"metadata": json.dumps(metadata)},
files=files,
timeout=120, # pipeline inline может занять 20-60 сек
)
if resp.status_code == 429:
time.sleep(10 * (attempt + 1))
continue
if resp.status_code >= 500:
time.sleep(30 * (attempt + 1))
continue
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as exc:
if attempt == max_retries - 1:
raise
time.sleep(30 * (attempt + 1))
return None
Полный класс-обёртка для парсера:
import json
import requests
from pathlib import Path
class VmkCollectorClient:
"""Клиент для отправки данных от парсера в VMK Data Collector."""
def __init__(self, base_url: str = "http://localhost:8020/api/v1"):
self.base_url = base_url.rstrip("/")
def _post(self, path: str, **kwargs):
url = f"{self.base_url}{path}"
resp = requests.post(url, **kwargs)
resp.raise_for_status()
return resp.json()
def ingest(
self,
source_slug: str,
external_id: str,
payload: dict,
) -> dict:
"""Отправить объявление без бинарных фото (только URLs в payload)."""
return self._post(
"/ingest",
json={
"source_slug": source_slug,
"external_id": external_id,
"payload": payload,
},
)
def ingest_with_images(
self,
source_slug: str,
external_id: str,
payload: dict,
image_paths: list[str | Path],
) -> dict:
"""Отправить объявление с бинарными фото."""
metadata = json.dumps({
"source_slug": source_slug,
"external_id": external_id,
"payload": payload,
})
files = []
for p in image_paths:
p = Path(p)
files.append(("images", (p.name, p.open("rb"), "image/jpeg")))
return self._post(
"/ingest/with-images",
data={"metadata": metadata},
files=files,
)
# --- Использование в парсере ---
client = VmkCollectorClient()
# Вариант 1: быстро, если фото не нужны прямо сейчас
client.ingest(
source_slug="domria",
external_id="34329468",
payload={
"title": "2-к квартира",
"price": 125000,
"images": ["https://cdn.riastatic.com/…/photo1.jpg"],
},
)
# Вариант 2: скачали фото через selenium/playwright
client.ingest_with_images(
source_slug="domria",
external_id="34329468",
payload={
"title": "2-к квартира",
"price": 125000,
"url": "https://dom.ria.com/…",
},
image_paths=["/tmp/domria_34329468_1.jpg", "/tmp/domria_34329468_2.jpg"],
)
Q: Фото сохраняются на диске сервера? A: Да. Бинарные файлы сохраняются в /var/lib/vmk/images/{property_id}/<hash>.{ext}. Temp-директория очищается сразу после pipeline.
Q: Что если отправить два одинаковых фото? A: Сервис проверяет hash (SHA256) и пропускает дубликаты. images_count будет меньше, чем отправлено.
Q: Какие поля payload обязательны? A: Хотя бы одно из: title или description. Всё остальное — опционально, но чем больше полей, тем точнее AI-нормализация.
Q: Как часто можно слать? A: /ingest — 60/min, /ingest/with-images — 30/min. При превышении вернётся HTTP 429.
Q: Можно ли отправить объявление без фото? A: Да. Поле images в payload необязательное.