# Интеграция парсеров с VMK Data Collector

> Целевая аудитория: разработчики парсеров, которые отправляют данные в сервис.

## Содержание

- [Общая схема](#общая-схема)
- [Выбор endpoint'а](#выбор-endpointа)
- [Endpoint: `/ingest` (только URLs)](#endpoint-ingest)
- [Endpoint: `/ingest/with-images` (бинарные файлы)](#endpoint-ingestwith-images)
- [Обработка ответов](#обработка-ответов)
- [Повторная отправка объявления](#повторная-отправка)
- [Ошибки и retry](#ошибки-и-retry)
- [Пример интеграции на Python](#пример-интеграции-на-python)

---

## Общая схема

```
┌─────────────┐     скачивает фото      ┌──────────────────────┐
│   Источник  │ ───────────────────────▶│      Парсер          │
│  (dom.ria,  │                         │  (selenium/scrapy/…) │
│   avito, …) │◀── raw HTML + metadata  └──────────┬───────────┘
└─────────────┘                                    │
                                                   │ POST /api/v1/ingest/with-images
                                                   ▼
                                          ┌──────────────────────┐
                                          │  VMK Data Collector  │
                                          │  FastAPI + AI        │
                                          │  pipeline inline     │
                                          └──────────────────────┘
```

Парсер отвечает за:
1. **Сбор** текста, цен, адресов, характеристик со страницы источника.
2. **Скачивание** фотографий с CDN источника (DOM.Ria, Avito и т.д.).
3. **Отправку** всего этого в Data Collector.

Сервис отвечает за:
- AI-валидацию (это вообще недвижимость?)
- Нормализацию полей
- Анализ изображений (vision)
- Обогащение текста (NER, summary, оценка цены)
- Хранение в PostgreSQL

---

## Выбор endpoint'а

| Endpoint | Когда использовать | Передаётся |
|---|---|---|
| `POST /api/v1/ingest` | Парсер **не может** или **не хочет** качать фото | `metadata` JSON + список `image_urls` внутри payload |
| `POST /api/v1/ingest/with-images` | Парсер **уже скачал** фото с источника | `metadata` JSON + бинарные файлы `images` |

### Почему `/with-images`?

Некоторые CDN (например, `cdn.riastatic.com`) блокируют автоматические запросы — возвращают `415 Unsupported Media Type` независимо от заголовков. Чтобы не подстраиваться под каждый источник, парсер сам качает фото через браузер (selenium/playwright) и передаёт нам бинарные файлы. Сервис не пытается обходить чужие CDN.

---

## Endpoint: `/ingest`

```http
POST /api/v1/ingest
Content-Type: application/json
```

### Тело

```json
{
  "source_slug": "domria",
  "external_id": "34329468",
  "payload": {
    "title": "2-к комнатная квартира",
    "description": "Продается квартира в центре",
    "price": 125000,
    "currency": "USD",
    "url": "https://dom.ria.com/…",
    "images": [
      "https://cdn.riastatic.com/…/photo1.jpg",
      "https://cdn.riastatic.com/…/photo2.jpg"
    ],
    "address": "Киев, Оболонь",
    "area_total": 65,
    "rooms": 2,
    "floor": 5,
    "floors_total": 9
  }
}
```

### Ответ

```json
{
  "job_id": 42,
  "property_id": null,
  "status": "pending",
  "reason": null,
  "message": "Queued for processing",
  "snapshot_id": null
}
```

Pipeline запускается **фоново** через `QueueWorker`. Парсер получает `job_id` и может забыть про задачу.

---

## Endpoint: `/ingest/with-images`

```http
POST /api/v1/ingest/with-images
Content-Type: multipart/form-data
```

### Тело (multipart)

| Поле | Тип | Описание |
|---|---|---|
| `metadata` | `string` (JSON) | Тот же формат, что и тело `/ingest` |
| `images` | `binary` | Фото. Поле повторяется для каждого файла. |

### Лимиты

- Макс. файлов за запрос: **50**
- Макс. размер одного файла: **10 МБ**

### Pipeline inline

В отличие от `/ingest`, pipeline отрабатывает **синхронно** в рамках HTTP-запроса — потому что изображения уже локальные и не нужно ждать очереди.

### Ответ

```json
{
  "job_id": 42,
  "property_id": 7,
  "status": "completed",
  "reason": null,
  "message": "Property ingested successfully",
  "snapshot_id": null
}
```

- `property_id` — ID сохранённого объявления в `property_listings`
- `snapshot_id` — ID снапшота (если объявление уже существовало и обновилось)

### Примеры (curl)

```bash
curl -X POST http://localhost:8020/api/v1/ingest/with-images \
  -F 'metadata={"source_slug": "domria", "external_id": "34329468", "payload": {"title": "2-комнатная квартира", "description": "Продается квартира", "price": 125000}}' \
  -F 'images=@/path/to/photo1.jpg' \
  -F 'images=@/path/to/photo2.jpg'
```

### Примеры (Python requests)

```python
import json
import requests

metadata = {
    "source_slug": "domria",
    "external_id": "34329468",
    "payload": {
        "title": "2-комнатная квартира",
        "description": "Продается квартира",
        "price": 125000,
        "url": "https://dom.ria.com/…",
        "address": "Киев",
        "area_total": 65,
        "rooms": 2,
    },
}

files = [
    ("images", open("photo1.jpg", "rb")),
    ("images", open("photo2.jpg", "rb")),
]

resp = requests.post(
    "http://localhost:8020/api/v1/ingest/with-images",
    data={"metadata": json.dumps(metadata)},
    files=files,
)
print(resp.json())
```

---

## Обработка ответов

### Успех: `status == "completed"`

```json
{
  "job_id": 42,
  "property_id": 7,
  "status": "completed",
  "message": "Property ingested successfully"
}
```

Объявление сохранено. Фото перемещены из temp в постоянное хранилище: `/var/lib/vmk/images/{property_id}/<hash>.jpg`.

### AI отбросил объявление: `status == "invalid"`

```json
{
  "job_id": 42,
  "property_id": null,
  "status": "invalid",
  "reason": "Текст не содержит информации о недвижимости.",
  "message": "Payload is not real estate"
}
```

Это **не ошибка парсера**. AI посчитал, что текст не про недвижимость. Можно логировать `reason` и идти дальше.

### Ошибка сервера

Если вернулся HTTP `500` или `422` — логировать и retry (см. ниже).

---

## Повторная отправка

Если объявление с тем же `source_slug` + `external_id` уже есть в БД, сервис:

1. Создаёт **снапшот** старой версии (сохраняет старые поля + diff).
2. **Обновляет** текущую запись `property_listings`.
3. Перезапускает AI-обогащение.

Парсеру ничего специально делать не нужно — просто отправлять данные заново.

---

## Ошибки и retry

| HTTP | Значение | Retry? |
|---|---|---|
| `202` | Принято в обработку | Нет |
| `422` | Ошибка валидации payload | Нет (исправить payload) |
| `429` | Rate limit (60/30 в минуту) | Да, через 10-60 сек |
| `500` | Внутренняя ошибка (Ollama упал и т.д.) | Да, через 30 сек |
| `502/503` | Сервис недоступен | Да, через экспоненциальный backoff |

### Простой retry-цикл на Python

```python
import time
import requests

BASE_URL = "http://localhost:8020/api/v1"

def ingest_with_retry(metadata, files, max_retries=3):
    for attempt in range(max_retries):
        try:
            resp = requests.post(
                f"{BASE_URL}/ingest/with-images",
                data={"metadata": json.dumps(metadata)},
                files=files,
                timeout=120,  # pipeline inline может занять 20-60 сек
            )
            if resp.status_code == 429:
                time.sleep(10 * (attempt + 1))
                continue
            if resp.status_code >= 500:
                time.sleep(30 * (attempt + 1))
                continue
            resp.raise_for_status()
            return resp.json()
        except requests.exceptions.RequestException as exc:
            if attempt == max_retries - 1:
                raise
            time.sleep(30 * (attempt + 1))
    return None
```

---

## Пример интеграции на Python

Полный класс-обёртка для парсера:

```python
import json
import requests
from pathlib import Path


class VmkCollectorClient:
    """Клиент для отправки данных от парсера в VMK Data Collector."""

    def __init__(self, base_url: str = "http://localhost:8020/api/v1"):
        self.base_url = base_url.rstrip("/")

    def _post(self, path: str, **kwargs):
        url = f"{self.base_url}{path}"
        resp = requests.post(url, **kwargs)
        resp.raise_for_status()
        return resp.json()

    def ingest(
        self,
        source_slug: str,
        external_id: str,
        payload: dict,
    ) -> dict:
        """Отправить объявление без бинарных фото (только URLs в payload)."""
        return self._post(
            "/ingest",
            json={
                "source_slug": source_slug,
                "external_id": external_id,
                "payload": payload,
            },
        )

    def ingest_with_images(
        self,
        source_slug: str,
        external_id: str,
        payload: dict,
        image_paths: list[str | Path],
    ) -> dict:
        """Отправить объявление с бинарными фото."""
        metadata = json.dumps({
            "source_slug": source_slug,
            "external_id": external_id,
            "payload": payload,
        })
        files = []
        for p in image_paths:
            p = Path(p)
            files.append(("images", (p.name, p.open("rb"), "image/jpeg")))

        return self._post(
            "/ingest/with-images",
            data={"metadata": metadata},
            files=files,
        )


# --- Использование в парсере ---

client = VmkCollectorClient()

# Вариант 1: быстро, если фото не нужны прямо сейчас
client.ingest(
    source_slug="domria",
    external_id="34329468",
    payload={
        "title": "2-к квартира",
        "price": 125000,
        "images": ["https://cdn.riastatic.com/…/photo1.jpg"],
    },
)

# Вариант 2: скачали фото через selenium/playwright
client.ingest_with_images(
    source_slug="domria",
    external_id="34329468",
    payload={
        "title": "2-к квартира",
        "price": 125000,
        "url": "https://dom.ria.com/…",
    },
    image_paths=["/tmp/domria_34329468_1.jpg", "/tmp/domria_34329468_2.jpg"],
)
```

---

## FAQ

**Q: Фото сохраняются на диске сервера?**
A: Да. Бинарные файлы сохраняются в `/var/lib/vmk/images/{property_id}/<hash>.{ext}`. Temp-директория очищается сразу после pipeline.

**Q: Что если отправить два одинаковых фото?**
A: Сервис проверяет `hash` (SHA256) и пропускает дубликаты. `images_count` будет меньше, чем отправлено.

**Q: Какие поля payload обязательны?**
A: Хотя бы одно из: `title` или `description`. Всё остальное — опционально, но чем больше полей, тем точнее AI-нормализация.

**Q: Как часто можно слать?**
A: `/ingest` — 60/min, `/ingest/with-images` — 30/min. При превышении вернётся HTTP 429.

**Q: Можно ли отправить объявление без фото?**
A: Да. Поле `images` в payload необязательное.
