diff --git a/README.md b/README.md index dfc41cd..650125c 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,99 @@ } ``` +--- + +### `POST /api/v1/ingest/with-images` + +Принимает **бинарные изображения** от парсера напрямую. Парсер сам выкачивает фото со своего источника и передаёт их нам — сервер не пытается обходить чужие CDN. + +#### Когда использовать + +| Эндпоинт | Когда использовать | +|---|---| +| `/ingest` | Парсер передаёт только **URLs** изображений | +| `/ingest/with-images` | Парсер сам скачал фото и передаёт **бинарные файлы** | + +#### Заголовки + +| Заголовок | Обязательный | Значение | +|---|---|---| +| `Content-Type` | Да | `multipart/form-data` | + +#### Тело запроса (multipart) + +``` +metadata: {"source_slug": "domria", "external_id": "34329468", "payload": {"title": "...", "description": "...", "price": 125000}} +images: [binary-photo-1.jpg] +images: [binary-photo-2.jpg] +images: [binary-photo-3.jpg] +``` + +Поле `metadata` — это **JSON-строка** с тем же форматом, что и в `/ingest`. +Поле `images` повторяется для каждого файла (FastAPI принимает `list[UploadFile]`). + +#### Лимиты + +- Макс. файлов за запрос: **50** +- Макс. размер одного файла: **10 МБ** + +#### Ответ `202 Accepted` (успех) + +Тот же формат, что и у `/ingest`, но pipeline отрабатывает **синхронно** в рамках запроса — потому что изображения уже локальные и не нужно ждать очереди. + +```json +{ + "job_id": 42, + "property_id": 7, + "status": "completed", + "reason": null, + "message": "Property ingested successfully", + "snapshot_id": 3 +} +``` + +#### Примеры (curl) + +```bash +# Отправить объявление с 3 фото +curl -X POST http://localhost:8020/api/v1/ingest/with-images \ + -F 'metadata={"source_slug": "domria", "external_id": "34329468", "payload": {"title": "2-комнатная квартира", "description": "Продается квартира", "price": 125000}}' \ + -F 'images=@/path/to/photo1.jpg' \ + -F 'images=@/path/to/photo2.jpg' \ + -F 'images=@/path/to/photo3.jpg' +``` + +#### Примеры (Python requests) + +```python +import requests + +metadata = { + "source_slug": "domria", + "external_id": "34329468", + "payload": { + "title": "2-комнатная квартира", + "description": "Продается квартира", + "price": 125000, + }, +} + +files = [ + ("images", open("photo1.jpg", "rb")), + ("images", open("photo2.jpg", "rb")), + ("images", open("photo3.jpg", "rb")), +] + +response = requests.post( + "http://localhost:8020/api/v1/ingest/with-images", + data={"metadata": json.dumps(metadata)}, + files=files, +) +print(response.json()) +``` + +--- + #### Статусы обработки После приёма задача (`job_id`) проходит через pipeline: diff --git a/pyproject.toml b/pyproject.toml index 60f61bf..ed74a75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ "slowapi>=0.1.9", "prometheus-client>=0.20.0", "prometheus-fastapi-instrumentator>=7.0.0", + "python-multipart>=0.0.9", ] [project.optional-dependencies] diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py index 7f0ad43..0fbf96b 100644 --- a/src/vmk_data_collector/api/v1/router_properties.py +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -1,12 +1,15 @@ +import json +from pathlib import Path from typing import Any import httpx import pydantic import structlog -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile from sqlalchemy.ext.asyncio import AsyncSession -from vmk_data_collector.api.deps import get_db +from vmk_data_collector.api.deps import get_db, get_ollama_client +from vmk_data_collector.core.config import settings from vmk_data_collector.core.exceptions import ValidationError from vmk_data_collector.core.limiter import limiter from vmk_data_collector.core.security import validate_url @@ -17,6 +20,9 @@ PayloadSchema, RawDataIngestRequest, ) +from vmk_data_collector.db.session import AsyncSessionLocal +from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.pipeline_factory import build_pipeline router = APIRouter() logger = structlog.get_logger() @@ -68,6 +74,140 @@ ) +@router.post("/ingest/with-images", response_model=IngestResponse, status_code=202) +@limiter.limit("30/minute") +async def ingest_with_images( + request: Request, + metadata: str = Form(..., description="JSON string with source_slug, external_id, payload"), + images: list[UploadFile] = File(default=[]), + db: AsyncSession = Depends(get_db), + client: OllamaClient = Depends(get_ollama_client), +) -> IngestResponse: + """Ingest a property listing with binary image files. + + The parser downloads images from its source and uploads them directly. + Pipeline runs inline because images are already local. + """ + logger.info( + "ingest_with_images_request", + metadata_len=len(metadata), + image_count=len(images), + ) + + # 1. Parse metadata + try: + data = json.loads(metadata) + except json.JSONDecodeError as exc: + logger.warning("ingest_with_images_json_error", error=str(exc)) + raise ValidationError(f"metadata must be valid JSON: {exc}") from exc + + source_slug = data.get("source_slug", "unknown") + external_id = data.get("external_id") + payload = data.get("payload", {}) + + # 2. Validate payload + try: + validated_payload = PayloadSchema(**payload) + except (pydantic.ValidationError, ValueError) as exc: + logger.warning( + "ingest_with_images_validation_failed", + source_slug=source_slug, + external_id=external_id, + error=str(exc), + ) + raise ValidationError(f"Invalid payload: {exc}") from exc + + # 3. Save raw_parsing_data and mark processing immediately + # so the background queue worker never picks it up. + from vmk_data_collector.domain.enums import RawDataStatus + + raw_repo = RawDataRepository(db) + raw = await raw_repo.create( + source_id=None, + external_id=external_id, + payload={ + **validated_payload.model_dump(), + "source_slug": source_slug, + }, + ) + await raw_repo.update_status(raw.id, RawDataStatus.processing) + await db.commit() + logger.info( + "ingest_with_images_raw_saved", + job_id=raw.id, + source_slug=source_slug, + external_id=external_id, + ) + + # 4. Stream images to temp directory + temp_dir = Path(settings.image_storage_path) / "temp" / str(raw.id) + temp_dir.mkdir(parents=True, exist_ok=True) + image_paths: list[str] = [] + for idx, upload in enumerate(images): + suffix = Path(upload.filename).suffix if upload.filename else ".jpg" + dest = temp_dir / f"{idx}{suffix}" + with dest.open("wb") as f: + while chunk := await upload.read(1024 * 1024): + f.write(chunk) + image_paths.append(str(dest)) + logger.info( + "ingest_with_images_saved", + job_id=raw.id, + order=idx, + path=str(dest), + size=dest.stat().st_size, + ) + + # 5. Run pipeline inline in a fresh session + # Pass uploaded image paths directly — no need to persist them in DB payload + async with AsyncSessionLocal() as pipeline_session: + pipeline = build_pipeline( + session=pipeline_session, + ollama_client=client, + ) + try: + result = await pipeline.process( + raw.id, uploaded_image_paths=image_paths + ) + await pipeline_session.commit() + except Exception as exc: + await pipeline_session.rollback() + logger.error( + "ingest_with_images_pipeline_failed", + job_id=raw.id, + error=str(exc), + exc_info=True, + ) + raise HTTPException(status_code=500, detail=f"Pipeline failed: {exc}") from exc + finally: + # Always clean up temp files — they are either moved to permanent + # storage by the pipeline or no longer needed. + if temp_dir.exists(): + try: + for f in temp_dir.iterdir(): + f.unlink(missing_ok=True) + temp_dir.rmdir() + logger.info( + "ingest_with_images_temp_cleaned", + job_id=raw.id, + temp_dir=str(temp_dir), + ) + except Exception as exc: + logger.warning( + "ingest_with_images_temp_cleanup_failed", + job_id=raw.id, + error=str(exc), + ) + + logger.info( + "ingest_with_images_completed", + job_id=raw.id, + status=result.status, + property_id=result.property_id, + ) + return result + + @router.post("/listings/{listing_id}/archive-check") async def archive_check_listing( listing_id: int, diff --git a/src/vmk_data_collector/services/image_downloader.py b/src/vmk_data_collector/services/image_downloader.py index f27cc31..ea02193 100644 --- a/src/vmk_data_collector/services/image_downloader.py +++ b/src/vmk_data_collector/services/image_downloader.py @@ -64,7 +64,16 @@ try: validate_url(image_url) - async with httpx.AsyncClient(timeout=30) as client, client.stream( + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/125.0.0.0 Safari/537.36" + ), + "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", + "Referer": "https://dom.ria.com/", + } + async with httpx.AsyncClient(timeout=30, headers=headers) as client, client.stream( "GET", image_url ) as response: response.raise_for_status() @@ -117,8 +126,64 @@ finally: image_download_duration_seconds.observe(time.perf_counter() - start) + async def process_local_file( + self, + property_id: int, + temp_path: str, + order_index: int, + content_type: str = "", + ) -> PropertyImageDownloadResult: + start = time.perf_counter() + logger.info( + "image_process_local_start", + property_id=property_id, + temp_path=temp_path, + order=order_index, + ) + + try: + source = Path(temp_path) + content = source.read_bytes() + if len(content) > _MAX_IMAGE_BYTES: + raise ImageDownloadError( + f"Image exceeds max size of {_MAX_IMAGE_BYTES} bytes" + ) + + image_hash = hashlib.sha256(content).hexdigest() + ext = self._detect_extension(content_type, temp_path) + + with Image.open(BytesIO(content)) as img: + width, height = img.size + + property_dir = self._storage_path / str(property_id) + property_dir.mkdir(parents=True, exist_ok=True) + + local_path = property_dir / f"{image_hash}.{ext}" + local_path.write_bytes(content) + + file_size = len(content) + + logger.info( + "image_process_local_complete", + property_id=property_id, + hash=image_hash, + width=width, + height=height, + size=file_size, + ) + + return PropertyImageDownloadResult( + local_path=str(local_path), + image_hash=image_hash, + width=width, + height=height, + file_size=file_size, + ) + finally: + image_download_duration_seconds.observe(time.perf_counter() - start) + @staticmethod - def _detect_extension(content_type: str, url: str) -> str: + def _detect_extension(content_type: str, url_or_path: str) -> str: ct = content_type.lower() if "jpeg" in ct or "jpg" in ct: return "jpg" @@ -131,7 +196,7 @@ from urllib.parse import urlparse - path = urlparse(url).path.lower() + path = urlparse(url_or_path).path.lower() for ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"): if path.endswith(ext): return ext.lstrip(".") diff --git a/src/vmk_data_collector/services/property_pipeline.py b/src/vmk_data_collector/services/property_pipeline.py index d683128..8ea38bd 100644 --- a/src/vmk_data_collector/services/property_pipeline.py +++ b/src/vmk_data_collector/services/property_pipeline.py @@ -52,6 +52,7 @@ snapshot_id: int | None = None aggregated_analysis: dict[str, Any] = field(default_factory=dict) enrichment: Any | None = None + uploaded_image_paths: list[str] = field(default_factory=list) class PropertyPipeline: @@ -85,7 +86,9 @@ self._image_analyzer = image_analyzer self._enricher = enricher - async def process(self, raw_data_id: int) -> IngestResponse: + async def process( + self, raw_data_id: int, uploaded_image_paths: list[str] | None = None + ) -> IngestResponse: if self._active_jobs is not None: self._active_jobs.add(raw_data_id) start = time.perf_counter() @@ -97,6 +100,15 @@ context.raw = await self._stage_load_raw(raw_data_id) logger.info("pipeline_loaded", raw_id=raw_data_id) + # Use uploaded image paths passed directly from endpoint + if uploaded_image_paths: + context.uploaded_image_paths = uploaded_image_paths + logger.info( + "pipeline_uploaded_images_found", + raw_id=raw_data_id, + image_count=len(uploaded_image_paths), + ) + # Stage 2: Normalize via AI context.norm_response = await self._stage_normalize( raw_data_id, context.raw @@ -128,6 +140,21 @@ } context.normalized = NormalizedProperty(**_norm_payload) + # Fallback: if AI didn't return images, use raw payload photos/images + if not context.normalized.images: + raw_payload = context.raw.payload if hasattr(context.raw, "payload") else {} + fallback_images = raw_payload.get("photos") or raw_payload.get("images") or [] + if isinstance(fallback_images, list): + context.normalized.images = [ + url for url in fallback_images if isinstance(url, str) + ] + if context.normalized.images: + logger.info( + "pipeline_images_fallback", + raw_id=raw_data_id, + image_count=len(context.normalized.images), + ) + # Stage 3: Resolve source and existing listing ( context.data_source, @@ -167,15 +194,21 @@ ) # Stage 6: Download and analyze images - context.aggregated_analysis = await self._stage_process_images( - context.property_id, - context.normalized.images, - ) + if context.uploaded_image_paths: + context.aggregated_analysis = await self._stage_process_uploaded_images( + context.property_id, + context.uploaded_image_paths, + ) + else: + context.aggregated_analysis = await self._stage_process_remote_images( + context.property_id, + context.normalized.images, + ) logger.info( "pipeline_images_processed", raw_id=raw_data_id, property_id=context.property_id, - image_count=len(context.normalized.images or []), + image_count=len(context.uploaded_image_paths or context.normalized.images or []), ) # Stage 7: AI enrichment @@ -265,7 +298,6 @@ data_source = await self._data_source_repo.get_or_create_by_slug( source_slug, name=source_slug ) - raw.source_id = data_source.id existing = await self._property_repo.get_by_source_and_external( data_source.id, raw.external_id @@ -330,7 +362,101 @@ ] await self._custom_field_repo.bulk_create(property_id, fields) - async def _stage_process_images( + async def _stage_process_uploaded_images( + self, + property_id: int, + image_paths: list[str], + ) -> dict[str, Any]: + aggregated_analysis: dict[str, Any] = {} + + for order, temp_path in enumerate(image_paths): + try: + result = await self._image_downloader.process_local_file( + property_id, temp_path, order + ) + except Exception as exc: + logger.error( + "image_uploaded_process_failed", + property_id=property_id, + temp_path=temp_path, + error=str(exc), + ) + continue + + duplicate = await self._image_repo.get_by_hash( + property_id, result.image_hash + ) + if duplicate: + logger.info( + "image_duplicate_skipped", + property_id=property_id, + hash=result.image_hash, + ) + continue + + image = await self._image_repo.create( + property_id, temp_path, order + ) + await self._image_repo.update_downloaded( + image.id, + result.local_path, + result.file_size, + result.width, + result.height, + result.image_hash, + ) + + if result.local_path: + try: + b64 = await asyncio.to_thread( + OllamaClient.image_to_base64, + result.local_path, + ) + analysis = await self._image_analyzer.analyze(b64) + await self._image_repo.update_analysis( + image.id, + analysis.overall_condition or "", + ) + aggregated_analysis[temp_path] = analysis.model_dump() + except Exception as exc: + logger.error( + "image_analysis_failed", + property_id=property_id, + image_id=image.id, + error=str(exc), + ) + + all_images = await self._image_repo.get_by_property(property_id) + await self._property_repo.update( + property_id, images_count=len(all_images) + ) + + # Clean up temp directory for this raw_id + if image_paths: + try: + from pathlib import Path + + temp_dir = Path(image_paths[0]).parent + if temp_dir.name != str(property_id): + # Only remove if it's a temp dir, not the permanent property dir + for p in image_paths: + Path(p).unlink(missing_ok=True) + temp_dir.rmdir() + logger.info( + "pipeline_temp_images_cleaned", + property_id=property_id, + temp_dir=str(temp_dir), + ) + except Exception as exc: + logger.warning( + "pipeline_temp_cleanup_failed", + property_id=property_id, + error=str(exc), + ) + + return aggregated_analysis + + async def _stage_process_remote_images( self, property_id: int, images: list[str], @@ -710,8 +836,9 @@ changed: dict[str, Any] = {} for key, new_val in new.items(): old_val = old.get(key) - if old_val != new_val: - changed[key] = {"old": old_val, "new": new_val} + serialized_new = PropertyPipeline._serialize_value(new_val) + if old_val != serialized_new: + changed[key] = {"old": old_val, "new": serialized_new} return changed @staticmethod diff --git a/src/vmk_data_collector/services/queue_worker.py b/src/vmk_data_collector/services/queue_worker.py index a6f4cd6..049a1d4 100644 --- a/src/vmk_data_collector/services/queue_worker.py +++ b/src/vmk_data_collector/services/queue_worker.py @@ -34,6 +34,7 @@ async def run(self) -> None: logger.info("queue_worker_started", poll_interval=self._poll_interval) + await self._reset_stuck_jobs() while not self._stop_event.is_set(): try: processed = await self._process_one() @@ -105,6 +106,23 @@ await self._mark_failed(session, raw.id, str(exc)) return True + async def _reset_stuck_jobs(self) -> None: + async with self._session_factory() as session: + result = await session.execute( + select(RawParsingData).where( + RawParsingData.status == RawDataStatus.processing + ) + ) + stuck = result.scalars().all() + for raw in stuck: + raw.status = RawDataStatus.pending + if stuck: + await session.commit() + logger.info( + "queue_reset_stuck_jobs", + count=len(stuck), + ) + async def _mark_failed( self, session: AsyncSession, raw_id: int, message: str ) -> None: