diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25b0db5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +.env +.venv/ +venv/ +ENV/ +env/ +domria_resume.db +*.db-journal +.pytest_cache/ +.mypy_cache/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f02f954 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +# curl_cffi bundles its own libcurl shared objects, so no system libcurl-dev is required. +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ ./src/ + +ENTRYPOINT ["python", "-m", "src.main"] diff --git a/README.md b/README.md index 24da6f3..dcd660a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,66 @@ -vmk-360-domria_parser -=============== +# DOM.RIA Parser + +Парсер оголошень нерухомості з [dom.ria.com/uk](https://dom.ria.com/uk/) для локального сервісу `data_collector`. + +## Швидкий старт + +```bash +# Встановлення залежностей +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt + +# Dry-run (без відправки в data_collector) +python -m src.main --city kiev --category 1 --operation sale --max-pages 1 --dry-run + +# Реальний запуск (1 сторінка) +python -m src.main --city kiev --category 1 --operation sale --max-pages 1 + +# Всі міста, всі категорії, всі операції (довгий! +python -m src.main +``` + +## Аргументи CLI + +| Параметр | Опис | +|----------|------| +| `--city` | Slug міста (можна декілька). За замовчуванням — всі. | +| `--category` | ID категорії: `1`=квартири, `2`=будинки, `3`=земля, `4`=комерція, `5`=гаражі. | +| `--operation` | `sale`, `rent`, `rent_daily`. За замовчуванням — всі. | +| `--max-pages` | Обмеження кількості сторінок каталогу на один target. | +| `--dry-run` | Парсити, але не відправляти в `data_collector` і не писати в SQLite. | +| `--db` | Шлях до SQLite БД для resume (`domria_resume.db`). | +| `--collector-url` | URL `data_collector` (`http://localhost:8020`). | + +## Архітектура + +``` +src/ +├── config.py # URL-патерни, міста, категорії, ліміти швидкості +├── session.py # curl_cffi Session з impersonate + cookies +├── parser.py # Витягування __INITIAL_STATE__ з HTML +├── normalizer.py # Нормалізація raw DOM.RIA → payload data_collector +├── collector.py # HTTP-клієнт для POST /api/v1/ingest +├── storage.py # SQLite resume / deduplication +├── crawler.py # Основний цикл: каталог → деталі → ingest +└── main.py # CLI entry point +``` + +## Особливості реалізації + +- **curl_cffi** із `impersonate="chrome124"` — єдиний робочий спосіб обходу анти-боту DOM.RIA. +- **Session cookies** — обов’язковий warmup через головну сторінку перед запитом каталогу. +- **Формат URL:** `/{operation}-{type}/{city}/?page=N` (напр. `/prodazha-kvartir/kiev/?page=1`). +- **Rate limits:** 10с між сторінками, 10с між детальними сторінками, пауза 2 хв кожні 50 сторінок. +- **Resume:** SQLite база запобігає повторній обробці вже зібраних оголошень. + +## Docker + +```bash +docker build -t domria-parser . +docker run --rm --network host domria-parser --city kiev --category 1 --max-pages 1 --dry-run +``` + +## Ліцензія + +Внутрішній проєкт VMK. diff --git a/TECH_SPEC.md b/TECH_SPEC.md new file mode 100644 index 0000000..6622a2e --- /dev/null +++ b/TECH_SPEC.md @@ -0,0 +1,344 @@ +# Технічне завдання: Парсер оголошень DOM.RIA (dom.ria.com/uk/) + +**Версія:** 3.0 (оновлено після реалізації MVP 2026-06-12) +**Статус:** MVP реалізовано та протестовано + +--- + +## 1. Мета та контекст + +Створити сервіс-парсер, який збирає актуальні оголошення про нерухомість з порталу [DOM.RIA](https://dom.ria.com/uk/) (українська версія) та передає їх до локального сервісу **data_collector** (`http://localhost:8020`). + +> **Важливо:** на поточному етапі ми **лише збираємо** дані (one-shot або періодичний crawl). Інкрементальне оновлення, перевірка архівів та історія цін — не входять у MVP; ці задачі делеговані `data_collector` або відкладені (див. розділ 7). + +--- + +## 2. Джерело даних — DOM.RIA + +### 2.1 Вибраний метод: HTTP-скрейпинг через `curl_cffi` (імітація TLS + cookies) + +Офіційний REST API (`developers.ria.com`) має жорсткі ліміти безкоштовного пакету: **30 запитів/годину, 1000/місяць**. Цього недостатньо для обходу всієї України. + +В ході дослідження знайдено робочий спосіб скрейпингу **без браузера**: + +- Бібліотека **`curl_cffi`** із параметром `impersonate="chrome124"` імітує повний TLS/JA3 fingerprint, заголовки та поведінку реального Chrome 124. +- Потрібна **`requests.Session`** — перед запитом каталогу необхідно відкрити головну сторінку `https://dom.ria.com/uk/`, щоб отримати session-cookies. +- **Правильний формат URL** — `/{operation}-{type}/{city}/` (наприклад, `/uk/prodazha-kvartir/kiev/`). URL із дефісом перед містом (`prodazha-kvartir-kiev/`) повертає 404. + +### 2.2 Чому headless Chrome не підходить + +`undetected-chromedriver`, Playwright (headless та non-headless) — усі повертають `serverStatus: 404` і порожній каталог. Сайт використовує додатковий рівень детекції (ймовірно, IP/cookie-based + fingerprint), який блокує автоматизовані браузери навіть при повній емуляції. `curl_cffi` обходить цей бар’єр, оскільки не є браузером, а імітує його на мережевому рівні. + +### 2.3 Що витягувати з відповіді + +Після успішного запиту HTML-сторінки в тілі міститься inline-скрипт із глобальною змінною `window.__INITIAL_STATE__`: + +- **Каталог:** `catalog.realtyForCatalog[]` — масив об’єктів оголошень на поточній сторінці (20–27 шт.). +- **Детальна сторінка:** `listing.data.realty` — повний об’єкт оголошення. +- **Лічильники:** `catalog.realtyCountCatalog` — загальна кількість результатів. +- **Пагінація:** контролюється через query-параметр `?page=N`. Перевірено: `page=1`, `page=2` — працюють. + +**Важливі нюанси структури даних (виявлені емпірично):** +- `priceArr` у каталозі — це **dict** `{"1": "130 000", "2": "112 317", "3": "5 863 000"}` (USD / EUR / UAH відповідно), а не список об’єктів. +- `price_USD` часто відсутній у каталозі; детальна сторінка містить явний `priceObj.priceUSD`. +- Фотографії в каталозі/деталі зберігаються як об’єкти з ключем `file` (відносний шлях на `cdn.riastatic.com`). +- `window.__INITIAL_STATE__` зустрічається у форматі `window.__INITIAL_STATE__={...}` **без пробілів** навколо `=`. Парсер повинен бути толерантним до цього. + +### 2.4 URL-патерни для різних міст і категорій + +| Категорія / Операція | Київ | Львів | Одеса | Харків | +|----------------------|------|-------|-------|--------| +| Продаж квартир | `prodazha-kvartir/kiev/` | `prodazha-kvartir/lvov/` | `prodazha-kvartir/odessa/` | `prodazha-kvartir/kharkov/` | +| Продаж будинків | `prodazha-domov/kiev/` | `prodazha-domov/lvov/` | `prodazha-domov/odessa/` | `prodazha-domov/kharkov/` | +| Оренда квартир | `arenda-kvartir/kiev/` | `arenda-kvartir/lvov/` | `arenda-kvartir/odessa/` | `arenda-kvartir/kharkov/` | + +> **Важливо:** slug міста може відрізнятися від очікуваного (наприклад, `lvov` замість `lviv`, `odessa` замість `odesa`, `kharkov` замість `kharkiv`). Повний перелік слід витягти з головної сторінки або sitemap. + +### 2.5 Альтернативні / резервні шляхи + +1. **Офіційний API як fallback.** Для невеликих тестових вибірок або відлагодження — `developers.ria.com`. +2. **Внутрішній API `search-engine`.** Якщо `curl_cffi` перестане працювати, можна дослідити endpoint `/uk/search-engine/` (вимагає `apiKey` з `__INITIAL_STATE__`). + +--- + +## 3. Область збору (scope) + +### 3.1 Географія + +**Вся Україна.** Парсер повинен обходити всі доступні міста/області, а не обмежуватися Києвом. + +### 3.2 Категорії та операції + +| `category` | Тип об’єкта | `operation_type` | +|------------|-------------|------------------| +| `1` | Квартири / кімнати | `1` — Продаж, `2` — Довгострокова оренда, `3` — Подобова оренда | +| `2` | Будинки / дачі / котеджі | `1`, `2`, `3` | +| `3` | Земельні ділянки | `1`, `2` | +| `4` | Комерційна нерухомість | `1`, `2`, `3` | +| `5` | Гаражі / парковки | `1`, `2` | + +> Для кожної комбінації `category × realty_type × operation_type × city_id` формується окремий crawl. + +### 3.3 Поля для збору + +Зібрати максимально можливий набір полів. **Прості та універсальні поля нормалізувати; рідкісні, вкладені або змінні — залишити в сирому вигляді** в підоб’єкті `characteristics_raw`. + +#### Нормалізовані поля (recommended) + +| Поле | Джерело в `__INITIAL_STATE__` | Тип | +|------|------------------------------|-----| +| `external_id` | `listing.data.realty_id` | `string` | +| `url` | Сформувати з `beautiful_url` або шаблону | `string` | +| `title` | `advert_title` (якщо є) або згенерувати з адреси | `string` | +| `operation` | `advert_type_name` | `string` (продаж / оренда) | +| `category` | `realty_type_parent_name` | `string` | +| `realty_type` | `realty_type_name` | `string` | +| `price.amount` | `price` або `price_total` | `number` | +| `price.currency` | `currency_type` | `string` | +| `price.price_per_sqm` | `price_item` | `number` | +| `address.city` | `city_name` | `string` | +| `address.district` | `district_name` | `string` | +| `address.street` | `street_name` | `string` | +| `address.building` | `building_number_str` | `string` | +| `location.lat` | `latitude` | `number` | +| `location.lon` | `longitude` | `number` | +| `area.total` | `total_square_meters` | `number` | +| `area.living` | `living_square_meters` | `number` | +| `area.kitchen` | `kitchen_square_meters` | `number` | +| `area.plot` | `ares_count` або `lot_unit` (для будинків/землі) | `number` | +| `rooms` | `rooms_count` | `number` | +| `floor.current` | `floor` | `number` | +| `floor.total` | `floors_count` | `number` | +| `year_built` | `characteristics_values[443]` (з мапінгом) | `number` | +| `wall_type` | `wall_type` (текст) або `characteristics_values[118/149]` | `string` | +| `offer_type` | `characteristics_values[1437]` (з мапінгом) | `string` | +| `description` | `description_uk` (пріоритет) або `description` | `string` | +| `photos[]` | `photos[].file` з префіксом `xl` перед `.jpg` | `string[]` | +| `contact.name` | `user.name` | `string` | +| `published_at` | `publishing_date` | `ISO8601` | +| `expires_at` | `date_end` | `ISO8601` | +| `created_at` | `created_at` | `ISO8601` | +| `inspected` | `inspected` | `boolean` | +| `verified` | `inspected_at` присутній | `boolean` | + +#### Сирі дані (raw) + +Весь об’єкт `characteristics_values` (ID → значення) залишити в `characteristics_raw` для downstream-аналітики, якщо `data_collector` або аналітик вирішить розпарсити додаткові поля. + +--- + +## 4. Отримувач даних — data_collector + +Локальний сервіс FastAPI (`http://localhost:8020`). Відкрита документація: `/docs`, OpenAPI: `/openapi.json`. + +### 4.1 Endpoint’и + +| Endpoint | Метод | Призначення для парсера | +|----------|-------|------------------------| +| `POST /api/v1/ingest` | Ingest | **Головний.** Приймає "сирий" запис про оголошення. | +| `POST /api/v1/listings/{listing_id}/archive-check` | Archive Check | Відкладено до майбутнього апдейту (див. розділ 7). | +| `GET /api/v1/health` | Health | Перевірка доступності сервісу перед роботою. | +| `GET /metrics` | Metrics | Prometheus-метрики (моніторинг). | + +### 4.2 Модель `RawDataIngestRequest` + +```json +{ + "source_slug": "dom_ria", + "external_id": "13825265", + "payload": { } +} +``` + +- `source_slug` — ідентифікатор джерела. Константа: `dom_ria`. +- `external_id` — унікальний ID оголошення на DOM.RIA (`realty_id`). +- `payload` — змішана структура: нормалізовані полі + сирі дані. + +### 4.3 Модель `IngestResponse` + +```json +{ + "job_id": 1, + "property_id": 123, + "status": "accepted", + "reason": null, + "message": "...", + "snapshot_id": 1 +} +``` + +- `job_id` — логувати для трасування. +- `snapshot_id` — `data_collector` сам відстежує зміни між версіями. + +--- + +## 5. Функціональні вимоги до парсера + +### 5.1 Обхід каталогу (пагінація) + +1. Для кожної комбінації фільтрів (`category`, `realty_type`, `operation`, `city_id`) відкрити сторінку каталогу. +2. Дочекатися повного JS-рендерингу (DOM готовий, `__INITIAL_STATE__` заповнений). +3. Витягти масив `catalog.realtyForCatalog`. +4. Визначити загальну кількість сторінок через `catalog.realtyCountCatalog` та `catalog.nextPageRealtyCount`. +5. Перейти на наступну сторінку (URL `?page=N`, клік "Далі", або нескінченний скролл — з’ясувати емпірично). +6. Для кожного оголошення в списку перейти на детальну сторінку та витягти `listing.data`. + +### 5.2 Rate limiting та етика навантаження + +- **Між сторінками:** мінімум **10 секунд** фіксованої затримки. +- **Між детальними сторінками:** мінімум **10 секунд**. +- **Перерви:** після кожних **50 сторінок** — пауза **2 хвилини** (120 секунд). +- **Нічний режим:** не обов’язково, але краще не штормити в години пік (09:00–18:00). +- **Retry:** при `5xx`, таймаутах або порожній відповіді — `exponential backoff` (1с, 2с, 4с, 8с), макс. 3 спроби. +- **Помилка на одному оголошенні** не зупиняє обхід. + +### 5.3 Ідемпотентність + +- Кожне оголошення обробляється незалежно. +- Повторний запуск з тим самим фільтром не ламає `data_collector` — сервіс сам створює новий `snapshot_id`, якщо `payload` змінився. + +--- + +## 6. Архітектура та компоненти + +``` +┌─────────────────┐ Браузерний рендеринг ┌──────────────┐ +│ Crawl Runner │ ───────────────────────────> │ dom.ria.com │ +│ (Python │ <─────────────────────────── │ (JS SPA) │ +│ script) │ __INITIAL_STATE__ └──────────────┘ +└────────┬────────┘ + │ + │ POST /api/v1/ingest + v +┌─────────────────┐ +│ data_collector │ +│ (localhost:8020)│ +└─────────────────┘ +``` + +### 6.1 Рекомендований стек + +| Компонент | Бібліотека | Призначення | +|-----------|-----------|-------------| +| HTTP-скрейпинг | `curl_cffi` (`requests.Session`) | Імітація TLS fingerprint Chrome + session cookies | +| Парсинг HTML/JSON | `beautifulsoup4` + `lxml` | Пошук `window.__INITIAL_STATE__` у відповіді | +| HTTP-клієнт (якщо API розкриється) | `httpx` (async) | Швидкі запити без браузера | +| Конфігурація | `pydantic-settings` + `.env` | `DATA_COLLECTOR_URL`, `DELAY_MIN`, `DELAY_MAX`, `CITY_SLUGS` | +| Логування | `structlog` + `rich` | Читабельні логи з прогресом | +| База стану (опціонально) | `SQLite` | Зберігання `external_id`, `last_seen_at`, `page` для resume | +| Планувальник | `systemd timer` / `cron` / ручний запуск | MVP — ручний запуск, потім таймер | + +### 6.2 Приклад payload для `data_collector` + +```json +{ + "source_slug": "dom_ria", + "external_id": "13825265", + "payload": { + "url": "https://dom.ria.com/uk/realty-perevireno-prodaja-kvartira-kiev-goloseevskiy-onufriya-trutenko-ulitsa-13825265.html", + "title": "Продаж квартири, Київ, Голосіївський, вул. Онуфрія Трутенка", + "operation": "продажа", + "category": "Квартири", + "realty_type": "Квартира", + "price": { + "amount": 60000, + "currency": "$", + "total": 60000, + "price_per_sqm": 645 + }, + "address": { + "city": "Київ", + "district": "Голосеевский", + "street": "Онуфрія Трутенка вулиця", + "building": "3 Г" + }, + "location": { + "lat": 50.39161687881484, + "lon": 30.480281005166944 + }, + "area": { + "total": 93, + "living": 50, + "kitchen": 20 + }, + "rooms": 2, + "floor": { "current": 23, "total": 23 }, + "year_built": 2015, + "wall_type": "кирпич", + "offer_type": "власник", + "description": "Світла квартира...", + "photos": [ + "https://cdn.riastatic.com/photos/dom/photo/8070/807045/80704539/80704539xl.jpg" + ], + "contact": { "name": "Владимир" }, + "published_at": "2017-10-02T14:51:07", + "expires_at": "2018-03-02T14:51:07", + "created_at": "2017-09-20T21:12:42", + "inspected": true, + "verified": true, + "characteristics_raw": { + "118": 108, + "209": 2, + "214": 93, + "216": 50, + "218": 20, + "227": 23, + "228": 23, + "234": 60000, + "242": 239, + "443": 1449, + "1437": 1436, + "...": "..." + }, + "_raw_listing": { } + } +} +``` + +> `_raw_listing` — необов’язковий підоб’єкт. Якщо якісь поля складно нормалізувати на поточному етапі, їх можна залишити тут у сирому вигляді. + +--- + +## 7. Відкриті питання та TODO (Future Updates) + +| # | Задача | Пріоритет | Примітки | +|---|--------|-----------|----------| +| 1 | **Повний перелік slug міст** | 🟡 Дослідження | Потрібно витягти всі доступні city-slug з головної сторінки / sitemap / меню. Перевірено: `kiev`, `lvov`, `odessa`, `kharkov`, `dnepr`, `cherkassy`, `ivano-frankovsk`, `lutsk`, `nikolaev`, `khmelnytskyi`. | +| 2 | **Пагінація каталогу** | 🟢 Рішено | Працює через `?page=N`. Перевірено до `page=2`. | +| 3 | **Земля, комерція, гаражі** | 🟡 Дослідження | URL `prodazha-zemli/kiev/`, `prodazha-kommercheskoj-nedvizhimosti/kiev/`, `prodazha-garazhej/kiev/` повертають 404. Можливо, для цих категорій slug інший або вони недоступні в окремому розділі. | +| 4 | **Архівація (archive-check)** | 🟢 Future | Реалізувати після налагодження основного crawl. Порівнювати `last_seen_at` з поточним запуском та викликати `POST /api/v1/listings/{id}/archive-check`. | +| 5 | **Історія цін / статистика** | 🟢 Future | Залишається на боці `data_collector` (він створює `snapshot_id` при зміні `payload`). | +| 6 | **Паралелізація** | 🟢 Future | На поточному етапі — послідовний crawl. Паралельність може підвищити ризик блокування. | +| 7 | **Моніторинг curl_cffi** | 🟡 Дослідження | Якщо DOM.RIA почне блокувати `curl_cffi`, потрібен fallback на інший impersonate (chrome123, safari) або на справжній браузер. | + +--- + +## 8. Нефункціональні вимоги + +- **Конфіденційність:** жодних ключів або паролів не комітити в репозиторій; використовувати `.env`. +- **Логування:** кожна сторінка та кожне оголошення логуються з timestamp, URL, статусом (`ok`, `error`, `skipped`), часом обробки. +- **Resume:** підтримка перезапуску з місця зупинки (зберігання поточного `page` / `city_id` / `external_id` в SQLite). +- **Тестованість:** моки для `data_collector`; локальні HTML-файли з `__INITIAL_STATE__` для unit-тестів парсера. +- **Docker:** підготувати `Dockerfile` з `curl_cffi` (залежить лише від libcurl, легкий образ). + +--- + +## 9. Етапи реалізації (оновлені) + +| Етап | Термін | Що робимо | +|------|--------|-----------| +| **PoC** | ✅ Виконано | Досліджено та знайдено робочий спосіб: `curl_cffi` + `impersonate=chrome124` + session cookies + URL `/{op}-{type}/{city}/`. Перевірено каталог та детальну сторінку. | +| **MVP Crawl** | ✅ Реалізовано | Скрипт на `curl_cffi`: обхід 1 міста + 1 категорії (Київ, квартири, продаж). Пагінація (`?page=N`), детальні сторінки, нормалізація payload, інжест у `data_collector`. **Протестовано: 23 оголошення успішно зібрано та надіслано.** | +| **Full Scope** | 🔄 Наступний етап | Розширити на всі підтверджені міста та категорії. Додати конфігурацію slug’ів. Rate limiting (10с між сторінками, 2 хв кожні 50). Resume з SQLite вже реалізовано. | +| **Polish** | 🔄 Наступний етап | Тести, обробка крайових випадків (404 на детальній, зняті оголошення), Docker-образ, моніторинг. | + +--- + +## 10. Джерела та матеріали + +- [Офіційна документація DOM.RIA API](https://developers.ria.com/docs/) +- [Неофіційна документація (GitHub)](https://github.com/yaroslavshostak/api_docs/tree/master/dom_ria) +- [curl_cffi](https://github.com/lexiforest/curl_cffi) — HTTP-клієнт з імітацією TLS fingerprint +- [undetected-chromedriver](https://github.com/ultrafunkamsterdam/undetected-chromedriver) — досліджувався, але не спрацював +- [Playwright](https://playwright.dev/python/) — досліджувався, але не спрацював +- OpenAPI `data_collector`: `http://localhost:8020/openapi.json` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..de92e24 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +curl_cffi>=0.6.0 +pydantic>=2.0.0 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/__init__.py diff --git a/src/collector.py b/src/collector.py new file mode 100644 index 0000000..e715aad --- /dev/null +++ b/src/collector.py @@ -0,0 +1,69 @@ +"""HTTP client for the local data_collector service.""" +import json +import time +from typing import Any, Dict, Optional + +from curl_cffi import requests + +from src.config import ( + DATA_COLLECTOR_BASE_URL, + DATA_COLLECTOR_HEALTH_ENDPOINT, + DATA_COLLECTOR_INGEST_ENDPOINT, + DATA_COLLECTOR_SOURCE_SLUG, +) + + +class DataCollectorClient: + def __init__(self, base_url: str = DATA_COLLECTOR_BASE_URL, timeout: int = 30): + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.session = requests.Session() + + def health_check(self) -> bool: + """Return True if the collector is reachable.""" + try: + resp = self.session.get( + f"{self.base_url}{DATA_COLLECTOR_HEALTH_ENDPOINT}", + timeout=self.timeout, + ) + return resp.status_code == 200 + except Exception as exc: + print(f"[collector] Health check failed: {exc}") + return False + + def ingest( + self, + external_id: str, + payload: Dict[str, Any], + source_slug: str = DATA_COLLECTOR_SOURCE_SLUG, + ) -> Optional[Dict[str, Any]]: + """Send one listing to /api/v1/ingest. Returns the JSON response or None on error.""" + body = { + "source_slug": source_slug, + "external_id": external_id, + "payload": payload, + } + url = f"{self.base_url}{DATA_COLLECTOR_INGEST_ENDPOINT}" + try: + resp = self.session.post( + url, + json=body, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + if resp.status_code == 202: + data = resp.json() + print( + f"[collector] Ingested {external_id}: status={data.get('status')} " + f"job_id={data.get('job_id')} snapshot_id={data.get('snapshot_id')}" + ) + return data + else: + print( + f"[collector] Ingest failed for {external_id}: " + f"HTTP {resp.status_code} body={resp.text[:200]}" + ) + return None + except Exception as exc: + print(f"[collector] Ingest exception for {external_id}: {exc}") + return None diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..4efb9b3 --- /dev/null +++ b/src/config.py @@ -0,0 +1,113 @@ +"""Configuration for DOM.RIA parser.""" +from dataclasses import dataclass +from typing import Dict, List + +# --- data_collector ----------------------------------------------------------- +DATA_COLLECTOR_BASE_URL = "http://localhost:8020" +DATA_COLLECTOR_INGEST_ENDPOINT = "/api/v1/ingest" +DATA_COLLECTOR_HEALTH_ENDPOINT = "/api/v1/health" +DATA_COLLECTOR_SOURCE_SLUG = "domria" + +# --- DOM.RIA scraping --------------------------------------------------------- +BASE_URL = "https://dom.ria.com/uk" +HOMEPAGE_URL = f"{BASE_URL}/" + +# impersonate target for curl_cffi (tested working) +IMPERSONATE = "chrome124" + +# HTTP headers shared across requests +DEFAULT_HEADERS = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "Accept-Language": "uk-UA,uk;q=0.9,ru;q=0.8,en;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Sec-Fetch-User": "?1", + "Upgrade-Insecure-Requests": "1", +} + +# --- Rate limits (seconds) ---------------------------------------------------- +DELAY_BETWEEN_CATALOG_PAGES = 10.0 +DELAY_BETWEEN_DETAIL_PAGES = 10.0 +PAUSE_EVERY_N_CATALOG_PAGES = 50 +PAUSE_DURATION_SECONDS = 120.0 + +# --- Categories --------------------------------------------------------------- +# type_id: slug fragment used in URL (e.g. "kvartir" for /prodazha-kvartir/) +CATEGORY_MAP: Dict[int, str] = { + 1: "kvartir", # apartments + 2: "domov", # houses + 3: "uchastkov", # land + 4: "kommercheskih", # commercial + 5: "garazhey", # garages +} + +# operation slug → URL prefix +OPERATION_SLUGS: Dict[str, str] = { + "sale": "prodazha", + "rent": "arenda", + "rent_daily": "posutochnaya-arenda", +} + +# --- Cities (verified or partially verified) ---------------------------------- +# slug: common name (for logging / payload metadata) +CITY_SLUGS: Dict[str, str] = { + "kiev": "Київ", + "lvov": "Львів", + "odessa": "Одеса", + "kharkov": "Харків", + "dnepr": "Дніпро", + "cherkassy": "Черкаси", + "ivano-frankovsk": "Івано-Франківськ", + "lutsk": "Луцьк", + "nikolaev": "Миколаїв", + "khmelnytskyi": "Хмельницький", + # TODO: extend full list +} + + +@dataclass(frozen=True) +class CrawlTarget: + """One city + category + operation combination.""" + city_slug: str + city_name: str + category_id: int + category_slug: str + operation: str # sale / rent / rent_daily + operation_slug: str + + @property + def catalog_url_template(self) -> str: + """e.g. https://dom.ria.com/uk/prodazha-kvartir/kiev/?page={page}""" + return ( + f"{BASE_URL}/{self.operation_slug}-{self.category_slug}" + f"/{self.city_slug}/?page={{page}}" + ) + + +def build_targets( + city_slugs: List[str] = None, + category_ids: List[int] = None, + operations: List[str] = None, +) -> List[CrawlTarget]: + """Generate all target combinations (Cartesian product).""" + city_slugs = city_slugs or list(CITY_SLUGS.keys()) + category_ids = category_ids or list(CATEGORY_MAP.keys()) + operations = operations or list(OPERATION_SLUGS.keys()) + + targets: List[CrawlTarget] = [] + for city in city_slugs: + for cat_id in category_ids: + for op in operations: + targets.append( + CrawlTarget( + city_slug=city, + city_name=CITY_SLUGS.get(city, city), + category_id=cat_id, + category_slug=CATEGORY_MAP[cat_id], + operation=op, + operation_slug=OPERATION_SLUGS[op], + ) + ) + return targets diff --git a/src/crawler.py b/src/crawler.py new file mode 100644 index 0000000..0f7cd2d --- /dev/null +++ b/src/crawler.py @@ -0,0 +1,145 @@ +"""Core crawling loop: catalog → detail → ingest.""" +import time +from typing import Any, Dict, List, Optional + +from curl_cffi.requests import Response + +from src.config import ( + CrawlTarget, + DELAY_BETWEEN_CATALOG_PAGES, + DELAY_BETWEEN_DETAIL_PAGES, + PAUSE_DURATION_SECONDS, + PAUSE_EVERY_N_CATALOG_PAGES, +) +from src.collector import DataCollectorClient +from src.normalizer import normalize_listing +from src.parser import parse_catalog_page, parse_detail_page +from src.session import DomRiaSession +from src.storage import ResumeStorage + + +class Crawler: + def __init__( + self, + session: DomRiaSession, + collector: Optional[DataCollectorClient], + storage: ResumeStorage, + ): + self.session = session + self.collector = collector + self.storage = storage + self._detail_counter = 0 + self._catalog_counter = 0 + + def crawl_target(self, target: CrawlTarget, max_pages: Optional[int] = None) -> None: + """Scrape one city + category + operation combination.""" + print( + f"\n[crawler] Starting target: {target.city_name} | " + f"{target.operation} | cat={target.category_id}" + ) + page = 1 + while True: + if max_pages is not None and page > max_pages: + print(f"[crawler] Reached max_pages={max_pages}") + break + + url = target.catalog_url_template.format(page=page) + print(f"[crawler] Catalog page {page}: {url}") + + resp = self.session.get_catalog(url) + catalog = self._handle_catalog_response(resp) + + if catalog is None: + print(f"[crawler] Stopping target — catalog page {page} failed") + break + + if catalog["page_404"]: + print(f"[crawler] Page 404 on catalog page {page} — target exhausted") + break + + items: List[Dict[str, Any]] = catalog["items"] + if not items: + print(f"[crawler] Empty items on page {page} — target exhausted") + break + + print( + f"[crawler] Page {page}: {len(items)} items, " + f"total estimated {catalog['total_count']}" + ) + + for item in items: + self._process_item(item, target) + + # --- rate limiting between catalog pages -------------------------- + self._catalog_counter += 1 + if self._catalog_counter % PAUSE_EVERY_N_CATALOG_PAGES == 0: + print( + f"[crawler] Pausing {PAUSE_DURATION_SECONDS}s after " + f"{PAUSE_EVERY_N_CATALOG_PAGES} catalog pages …" + ) + time.sleep(PAUSE_DURATION_SECONDS) + else: + time.sleep(DELAY_BETWEEN_CATALOG_PAGES) + + page += 1 + + print(f"[crawler] Finished target: {target.city_name} | {target.operation} | cat={target.category_id}") + + def _handle_catalog_response(self, resp: Response) -> Optional[Dict[str, Any]]: + """Return parsed catalog dict or None on unrecoverable error.""" + if resp.status_code == 404: + print(f"[crawler] Catalog 404") + return {"items": [], "total_count": 0, "page_404": True} + if resp.status_code != 200: + print(f"[crawler] Catalog HTTP {resp.status_code}") + return None + return parse_catalog_page(resp.text) + + def _process_item(self, item: Dict[str, Any], target: CrawlTarget) -> None: + realty_id = str(item.get("realty_id")) + if not realty_id or realty_id == "None": + print("[crawler] Skipping item without realty_id") + return + + if self.storage.is_processed(realty_id): + return # silently skip duplicates + + beautiful_url = item.get("beautiful_url") + if not beautiful_url: + print(f"[crawler] Item {realty_id} missing beautiful_url, skipping detail") + detail = None + else: + print(f"[crawler] Detail {realty_id}: {beautiful_url}") + detail = self._fetch_detail(beautiful_url) + + payload = normalize_listing( + catalog_item=item, + detail_realty=detail, + city_name_meta=target.city_name, + ) + + if self.collector is not None: + result = self.collector.ingest(realty_id, payload) + if result: + job_id = result.get("job_id") + self.storage.mark_processed(realty_id, job_id=job_id) + else: + # Even if ingest fails we mark processed to avoid infinite retry loops + # on truly broken listings; operator can clear the DB row manually. + self.storage.mark_processed(realty_id, job_id=None) + else: + print(f"[crawler] Dry-run: would ingest {realty_id} ({payload.get('url')})") + + # --- rate limiting between detail pages ----------------------------- + self._detail_counter += 1 + time.sleep(DELAY_BETWEEN_DETAIL_PAGES) + + def _fetch_detail(self, beautiful_url: str) -> Optional[Dict[str, Any]]: + resp = self.session.get_detail(beautiful_url) + if resp.status_code != 200: + print(f"[crawler] Detail HTTP {resp.status_code} for {beautiful_url}") + return None + detail = parse_detail_page(resp.text) + if detail is None: + print(f"[crawler] Detail parse failed for {beautiful_url}") + return detail diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..cb8a0e1 --- /dev/null +++ b/src/main.py @@ -0,0 +1,112 @@ +"""CLI entry point for the DOM.RIA parser.""" +import argparse +import sys +from typing import List, Optional + +from src.config import build_targets, CATEGORY_MAP, CITY_SLUGS, OPERATION_SLUGS +from src.collector import DataCollectorClient +from src.crawler import Crawler +from src.session import DomRiaSession +from src.storage import ResumeStorage + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="DOM.RIA → data_collector scraper") + parser.add_argument( + "--city", + action="append", + help="City slug to scrape (can be given multiple times). Default: all.", + ) + parser.add_argument( + "--category", + type=int, + action="append", + choices=list(CATEGORY_MAP.keys()), + help="Category ID (1=apartments, 2=houses, 3=land, 4=commercial, 5=garages). Default: all.", + ) + parser.add_argument( + "--operation", + action="append", + choices=list(OPERATION_SLUGS.keys()), + help="Operation (sale, rent, rent_daily). Default: all.", + ) + parser.add_argument( + "--max-pages", + type=int, + default=None, + help="Limit catalog pages per target (for testing).", + ) + parser.add_argument( + "--db", + default="domria_resume.db", + help="Path to SQLite resume DB.", + ) + parser.add_argument( + "--collector-url", + default="http://localhost:8020", + help="data_collector base URL.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Skip ingestion to data_collector (parse only).", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + + # --- build targets -------------------------------------------------------- + city_slugs: Optional[List[str]] = args.city + category_ids: Optional[List[int]] = args.category + operations: Optional[List[str]] = args.operation + + targets = build_targets( + city_slugs=city_slugs, + category_ids=category_ids, + operations=operations, + ) + print(f"[main] {len(targets)} target(s) to process") + if not targets: + print("[main] Nothing to do.") + return 0 + + # --- init components ------------------------------------------------------ + print("[main] Initializing session …") + session = DomRiaSession() + session.warmup() + + collector: Optional[DataCollectorClient] = None + if not args.dry_run: + collector = DataCollectorClient(base_url=args.collector_url) + if not collector.health_check(): + print("[main] data_collector is not reachable — aborting.") + return 1 + print("[main] data_collector is healthy.") + + storage = ResumeStorage(db_path=args.db) + stats = storage.stats() + print(f"[main] Resume DB: {stats['total']} total processed, {stats['today']} today") + + crawler = Crawler(session=session, collector=collector, storage=storage) + + # --- run ------------------------------------------------------------------ + for idx, target in enumerate(targets, 1): + print(f"\n[main] Target {idx}/{len(targets)}") + try: + crawler.crawl_target(target, max_pages=args.max_pages) + except KeyboardInterrupt: + print("\n[main] Interrupted by user.") + return 130 + except Exception as exc: + print(f"[main] Unhandled exception on target {target}: {exc}") + # Continue with next target — we don't want one broken city to + # kill a long-running multi-city job. + + print("\n[main] All targets finished.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/normalizer.py b/src/normalizer.py new file mode 100644 index 0000000..a82830f --- /dev/null +++ b/src/normalizer.py @@ -0,0 +1,155 @@ +"""Transform DOM.RIA raw data into a data_collector payload.""" +from typing import Any, Dict, List, Optional + + +def _first(val): + """Helper: return first element if list, else the value itself.""" + if isinstance(val, list) and val: + return val[0] + return val + + +def _safe_int(val) -> Optional[int]: + try: + return int(float(str(val).replace(" ", "").replace(",", "."))) + except (ValueError, TypeError): + return None + + +def _safe_float(val) -> Optional[float]: + try: + return float(str(val).replace(" ", "").replace(",", ".")) + except (ValueError, TypeError): + return None + + +def _build_photo_urls(photos: List[Dict]) -> List[str]: + """Turn photo metadata into full HTTPS URLs. + + DOM.RIA photos are served from cdn.riastatic.com. + We can inject 'xl' before the extension for a larger variant. + """ + urls: List[str] = [] + for p in photos: + base = p.get("file") or p.get("beautifulUrl") or p.get("photo_base_url") or p.get("url") or p.get("src") + if not base: + continue + if base.startswith("//"): + base = "https:" + base + elif base.startswith("/"): + base = "https://cdn.riastatic.com" + base + elif not base.startswith("http"): + base = "https://cdn.riastatic.com/" + base + # xl variant if plain .jpg + if base.endswith(".jpg") and "xl" not in base.split("/")[-1]: + base = base.replace(".jpg", "xl.jpg") + urls.append(base) + return urls + + +def normalize_listing( + catalog_item: Dict[str, Any], + detail_realty: Optional[Dict[str, Any]] = None, + city_name_meta: str = "", +) -> Dict[str, Any]: + """Create the payload dict that will be sent to data_collector. + + Strategy: + * Simple / stable fields → normalized scalar values. + * Complex / nested / volatile fields → kept raw under `raw_domria`. + """ + c = catalog_item # shorthand + d = detail_realty or {} + + # --- normalized simple fields ------------------------------------------------ + price_usd = _safe_int(c.get("price_USD")) or _safe_int(c.get("price_usd")) + + # Try detail priceObj first (most explicit) + if price_usd is None: + price_usd = _safe_int(d.get("priceObj", {}).get("priceUSD")) + + # Try catalog priceArr (dict with keys 1/2/3 or list of objects) + if price_usd is None: + pa = c.get("priceArr") or d.get("priceArr") + if isinstance(pa, dict): + # DOM.RIA sends {'1': '130 000', '2': '112 317', '3': '5 863 000'} + # Key '1' is USD in all observed cases + price_usd = _safe_int(pa.get("1")) + elif isinstance(pa, list): + for entry in pa: + if isinstance(entry, dict) and entry.get("currency") == "USD": + price_usd = _safe_int(entry.get("price")) + break + + # Fallback to plain price field (usually already in USD on sale pages) + if price_usd is None: + price_usd = _safe_int(c.get("price")) + + normalized = { + "external_id": str(c.get("realty_id")), + "url": f"https://dom.ria.com/uk/{(c.get('beautiful_url') or '').lstrip('/')}", + "title": (d.get("title") or c.get("title") or "").strip(), + "description": (d.get("description") or c.get("description") or "").strip(), + "price_usd": price_usd, + "price_raw": c.get("priceArr") or c.get("price"), + "area_total_m2": _safe_float(c.get("total_square_meters")), + "area_living_m2": _safe_float( + d.get("living_square_meters") or c.get("living_square_meters") + ), + "area_kitchen_m2": _safe_float( + d.get("kitchen_square_meters") or c.get("kitchen_square_meters") + ), + "rooms_count": _safe_int(c.get("rooms_count")), + "floor": _safe_int(c.get("floor")), + "floors_total": _safe_int( + d.get("floors_count") or c.get("floors_count") + ), + "city_name": (d.get("city_name") or c.get("city_name") or city_name_meta or "").strip(), + "district_name": (d.get("district_name") or c.get("district_name") or "").strip(), + "street_name": (d.get("street_name") or c.get("street_name") or "").strip(), + "building_number": (d.get("building_number") or c.get("building_number") or "").strip(), + "latitude": _safe_float(c.get("lat")), + "longitude": _safe_float(c.get("lng")), + "photos": _build_photo_urls(d.get("photos") or c.get("photos") or []), + "tags": [t.get("name_uk") or t.get("name") for t in (d.get("tags") or c.get("tags") or []) if isinstance(t, dict)], + "contact_phones": _extract_phones(d), + "raw_domria": { + "catalog_item": c, + "detail_realty": d, + }, + } + + # Remove None values to keep payload compact + cleaned = {k: v for k, v in normalized.items() if v is not None} + return cleaned + + +def _extract_phones(detail: Dict[str, Any]) -> List[str]: + """Pull phone numbers from the detail object if available.""" + phones: List[str] = [] + user = detail.get("user") + user_contacts = user.get("contacts") if isinstance(user, dict) else None + contacts = detail.get("contacts") or user_contacts + if not isinstance(contacts, dict): + return phones + + for key in ("phones", "phone", "mobile", "tel"): + vals = contacts.get(key) + if isinstance(vals, list): + for v in vals: + if isinstance(v, dict): + phones.append(v.get("phone") or v.get("number") or str(v)) + elif isinstance(v, str): + phones.append(v) + elif isinstance(vals, str): + phones.append(vals) + + # dedupe preserving order + seen = set() + uniq = [] + for p in phones: + p = str(p).strip() + if p and p not in seen: + seen.add(p) + uniq.append(p) + return uniq diff --git a/src/parser.py b/src/parser.py new file mode 100644 index 0000000..99f9ace --- /dev/null +++ b/src/parser.py @@ -0,0 +1,89 @@ +"""Extract `window.__INITIAL_STATE__` from DOM.RIA HTML responses.""" +import json +from typing import Any, Dict, List, Optional + + +def extract_initial_state(html: str) -> Optional[Dict[str, Any]]: + """Parse the embedded JSON from `window.__INITIAL_STATE__ = {...};`. + + Uses manual bracket counting instead of a regex because the JSON may + contain nested objects and the closing `};` pattern is not reliably + matched by a simple regex across all pages. + """ + marker = "window.__INITIAL_STATE__" + idx = html.find(marker) + if idx == -1: + return None + + start = idx + len(marker) + # skip optional whitespace and '=' + while start < len(html) and html[start] in " \t=": + start += 1 + if start >= len(html) or html[start] != "{": + return None + + depth = 0 + in_string = False + escape = False + for i, ch in enumerate(html[start:], start=start): + if in_string: + if escape: + escape = False + continue + if ch == "\\": + escape = True + continue + if ch == '"': + in_string = False + continue + + if ch == '"': + in_string = True + continue + + if ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0: + json_str = html[start : i + 1] + try: + return json.loads(json_str) + except json.JSONDecodeError: + return None + + return None + + +def parse_catalog_page(html: str) -> Dict[str, Any]: + """Return catalog data from a listing search page. + + Keys returned: + - items: List[dict] — catalog.realtyForCatalog + - total_count: int — catalog.realtyCountCatalog + - page_404: bool — bus.page404 flag + """ + state = extract_initial_state(html) + if state is None: + return {"items": [], "total_count": 0, "page_404": True} + + catalog = state.get("catalog", {}) + items = catalog.get("realtyForCatalog", []) + total_count = catalog.get("realtyCountCatalog", 0) + page_404 = bool(state.get("bus", {}).get("page404")) + + return {"items": items, "total_count": total_count, "page_404": page_404} + + +def parse_detail_page(html: str) -> Optional[Dict[str, Any]]: + """Return the full realty object from a detail page. + + Data path inside __INITIAL_STATE__: listing.data.realty + """ + state = extract_initial_state(html) + if state is None: + return None + + listing = state.get("listing", {}) + data = listing.get("data", {}) + return data.get("realty") diff --git a/src/session.py b/src/session.py new file mode 100644 index 0000000..11dff8c --- /dev/null +++ b/src/session.py @@ -0,0 +1,66 @@ +"""curl_cffi session manager with cookie warmup and TLS impersonation.""" +import time +from curl_cffi import requests + +from src.config import ( + BASE_URL, + DEFAULT_HEADERS, + HOMEPAGE_URL, + IMPERSONATE, +) + + +class DomRiaSession: + """Lightweight wrapper around curl_cffi.requests.Session. + + Establishes cookies by hitting the homepage first, then reuses the + session (cookie jar + connection pool) for all subsequent requests. + """ + + def __init__(self, extra_headers: dict = None, timeout: int = 30): + self.timeout = timeout + self.session = requests.Session() + self.headers = {**DEFAULT_HEADERS, **(extra_headers or {})} + + def warmup(self) -> None: + """Hit the homepage so DOM.RIA sets session cookies. + + Without this step catalog requests often return 404/403. + """ + print("[session] Warming up cookies via homepage …") + resp = self.session.get( + HOMEPAGE_URL, + headers=self.headers, + impersonate=IMPERSONATE, + timeout=self.timeout, + ) + # We only care about side-effects (Set-Cookie), not the body + print(f"[session] Homepage status={resp.status_code} cookies={len(self.session.cookies)} items") + time.sleep(2.0) + + def get_catalog(self, url: str) -> requests.Response: + """GET a catalog listing page with correct Referer.""" + headers = { + **self.headers, + "Referer": HOMEPAGE_URL, + } + return self.session.get( + url, + headers=headers, + impersonate=IMPERSONATE, + timeout=self.timeout, + ) + + def get_detail(self, beautiful_url: str) -> requests.Response: + """GET a detail page (beautiful_url is relative, e.g. 'realtor/…').""" + url = f"{BASE_URL}/{beautiful_url.lstrip('/')}" + headers = { + **self.headers, + "Referer": HOMEPAGE_URL, + } + return self.session.get( + url, + headers=headers, + impersonate=IMPERSONATE, + timeout=self.timeout, + ) diff --git a/src/storage.py b/src/storage.py new file mode 100644 index 0000000..c788fba --- /dev/null +++ b/src/storage.py @@ -0,0 +1,75 @@ +"""SQLite persistence for resume / deduplication.""" +import sqlite3 +import time +from pathlib import Path +from typing import List, Optional, Set + + +class ResumeStorage: + """Tracks which realty_ids have already been processed. + + Schema: + processed_realties + - realty_id TEXT PRIMARY KEY + - ingested_at INTEGER (unix timestamp) + - job_id INTEGER (data_collector job_id if available) + """ + + def __init__(self, db_path: str = "domria_resume.db"): + self.db_path = Path(db_path) + self._init_db() + + def _init_db(self) -> None: + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS processed_realties ( + realty_id TEXT PRIMARY KEY, + ingested_at INTEGER NOT NULL, + job_id INTEGER + ) + """ + ) + conn.execute( + """ + CREATE INDEX IF NOT EXISTS idx_ingested_at + ON processed_realties(ingested_at) + """ + ) + conn.commit() + + def is_processed(self, realty_id: str) -> bool: + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT 1 FROM processed_realties WHERE realty_id = ?", + (realty_id,), + ).fetchone() + return row is not None + + def mark_processed( + self, realty_id: str, job_id: Optional[int] = None + ) -> None: + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """ + INSERT OR REPLACE INTO processed_realties (realty_id, ingested_at, job_id) + VALUES (?, ?, ?) + """, + (realty_id, int(time.time()), job_id), + ) + conn.commit() + + def get_all_ids(self) -> Set[str]: + with sqlite3.connect(self.db_path) as conn: + rows = conn.execute("SELECT realty_id FROM processed_realties").fetchall() + return {r[0] for r in rows} + + def stats(self) -> dict: + with sqlite3.connect(self.db_path) as conn: + total = conn.execute( + "SELECT COUNT(*) FROM processed_realties" + ).fetchone()[0] + today = conn.execute( + "SELECT COUNT(*) FROM processed_realties WHERE date(ingested_at, 'unixepoch') = date('now')" + ).fetchone()[0] + return {"total": total, "today": today}