diff --git a/src/vmk_data_collector/api/deps.py b/src/vmk_data_collector/api/deps.py index 30d8a07..4898fca 100644 --- a/src/vmk_data_collector/api/deps.py +++ b/src/vmk_data_collector/api/deps.py @@ -3,29 +3,9 @@ from fastapi import Depends, Request from sqlalchemy.ext.asyncio import AsyncSession -from vmk_data_collector.core.config import settings -from vmk_data_collector.db.repositories.ai_enrichment import ( - AiEnrichmentRepository, -) -from vmk_data_collector.db.repositories.custom_field import ( - CustomFieldRepository, -) -from vmk_data_collector.db.repositories.data_source import ( - DataSourceRepository, -) -from vmk_data_collector.db.repositories.image import ImageRepository -from vmk_data_collector.db.repositories.property import PropertyRepository -from vmk_data_collector.db.repositories.property_type import ( - PropertyTypeRepository, -) -from vmk_data_collector.db.repositories.raw_data import RawDataRepository -from vmk_data_collector.db.repositories.snapshot import SnapshotRepository from vmk_data_collector.db.session import AsyncSessionLocal -from vmk_data_collector.services.ai_enricher import AiEnricher -from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer -from vmk_data_collector.services.ai_normalizer import AiNormalizer -from vmk_data_collector.services.image_downloader import ImageDownloader from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.pipeline_factory import build_pipeline from vmk_data_collector.services.property_pipeline import PropertyPipeline @@ -43,25 +23,8 @@ db: AsyncSession = Depends(get_db), client: OllamaClient = Depends(get_ollama_client), ) -> PropertyPipeline: - normalizer = AiNormalizer(client=client) - image_analyzer = AiImageAnalyzer(client=client) - enricher = AiEnricher(client=client) - downloader = ImageDownloader( - storage_path=settings.image_storage_path_abs - ) - - return PropertyPipeline( - raw_repo=RawDataRepository(db), - property_repo=PropertyRepository(db), - image_repo=ImageRepository(db), - custom_field_repo=CustomFieldRepository(db), - snapshot_repo=SnapshotRepository(db), - enrichment_repo=AiEnrichmentRepository(db), - data_source_repo=DataSourceRepository(db), - property_type_repo=PropertyTypeRepository(db), - normalizer=normalizer, - image_downloader=downloader, - image_analyzer=image_analyzer, - enricher=enricher, + return build_pipeline( + session=db, + ollama_client=client, active_jobs=getattr(request.app.state, "active_jobs", None), ) diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py index ae10e77..e1d208e 100644 --- a/src/vmk_data_collector/api/v1/router_properties.py +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.ext.asyncio import AsyncSession -from vmk_data_collector.api.deps import get_db, get_property_pipeline +from vmk_data_collector.api.deps import get_db from vmk_data_collector.core.exceptions import ValidationError from vmk_data_collector.core.limiter import limiter from vmk_data_collector.db.repositories.property import PropertyRepository @@ -14,7 +14,6 @@ PayloadSchema, RawDataIngestRequest, ) -from vmk_data_collector.services.property_pipeline import PropertyPipeline router = APIRouter() @@ -25,7 +24,6 @@ fastapi_request: Request, ingest_request: RawDataIngestRequest, db: AsyncSession = Depends(get_db), - pipeline: PropertyPipeline = Depends(get_property_pipeline), ) -> IngestResponse: try: validated_payload = PayloadSchema(**ingest_request.payload) @@ -41,9 +39,12 @@ "source_slug": ingest_request.source_slug, }, ) - response = await pipeline.process(raw.id) await db.commit() - return response + return IngestResponse( + job_id=raw.id, + status="pending", + message="Queued for processing", + ) @router.post("/listings/{listing_id}/archive-check") diff --git a/src/vmk_data_collector/main.py b/src/vmk_data_collector/main.py index 32dd98c..4e6ac0b 100644 --- a/src/vmk_data_collector/main.py +++ b/src/vmk_data_collector/main.py @@ -1,4 +1,5 @@ import asyncio +import contextlib from contextlib import asynccontextmanager from pathlib import Path @@ -22,7 +23,12 @@ ) from vmk_data_collector.core.limiter import limiter from vmk_data_collector.core.logging import configure_logging +from vmk_data_collector.db.session import AsyncSessionLocal from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.pipeline_factory import build_pipeline +from vmk_data_collector.services.queue_worker import QueueWorker + +logger = structlog.get_logger() async def _wait_active_jobs(active_jobs: set[int]) -> None: @@ -39,10 +45,34 @@ base_url=settings.ollama_base_url, timeout=settings.ollama_timeout, ) + + stop_event = asyncio.Event() + worker = QueueWorker( + session_factory=AsyncSessionLocal, + pipeline_factory=lambda session: build_pipeline( + session, + ollama_client=app.state.ollama_client, + active_jobs=app.state.active_jobs, + ), + poll_interval=1.0, + stop_event=stop_event, + ) + worker_task = asyncio.create_task(worker.run()) + yield + + # Shutdown: stop worker first, then wait for active jobs, then close client + stop_event.set() + try: + await asyncio.wait_for(worker_task, timeout=60) + except TimeoutError: + logger.warning("worker_shutdown_timeout") + worker_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await worker_task + active_jobs: set[int] = getattr(app.state, "active_jobs", set()) if active_jobs: - logger = structlog.get_logger() logger.info("waiting_for_active_jobs", count=len(active_jobs)) try: await asyncio.wait_for( @@ -52,6 +82,7 @@ logger.warning( "shutdown_timeout_reached", active=len(active_jobs) ) + await app.state.ollama_client.close() diff --git a/src/vmk_data_collector/services/pipeline_factory.py b/src/vmk_data_collector/services/pipeline_factory.py new file mode 100644 index 0000000..52762aa --- /dev/null +++ b/src/vmk_data_collector/services/pipeline_factory.py @@ -0,0 +1,55 @@ +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.core.config import settings +from vmk_data_collector.db.repositories.ai_enrichment import ( + AiEnrichmentRepository, +) +from vmk_data_collector.db.repositories.custom_field import ( + CustomFieldRepository, +) +from vmk_data_collector.db.repositories.data_source import ( + DataSourceRepository, +) +from vmk_data_collector.db.repositories.image import ImageRepository +from vmk_data_collector.db.repositories.property import PropertyRepository +from vmk_data_collector.db.repositories.property_type import ( + PropertyTypeRepository, +) +from vmk_data_collector.db.repositories.raw_data import RawDataRepository +from vmk_data_collector.db.repositories.snapshot import SnapshotRepository +from vmk_data_collector.services.ai_enricher import AiEnricher +from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer +from vmk_data_collector.services.ai_normalizer import AiNormalizer +from vmk_data_collector.services.image_downloader import ImageDownloader +from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.property_pipeline import PropertyPipeline + + +def build_pipeline( + session: AsyncSession, + ollama_client: OllamaClient, + active_jobs: set[int] | None = None, +) -> PropertyPipeline: + """Build a fully wired PropertyPipeline for a given DB session.""" + normalizer = AiNormalizer(client=ollama_client) + image_analyzer = AiImageAnalyzer(client=ollama_client) + enricher = AiEnricher(client=ollama_client) + downloader = ImageDownloader( + storage_path=settings.image_storage_path_abs + ) + + return PropertyPipeline( + raw_repo=RawDataRepository(session), + property_repo=PropertyRepository(session), + image_repo=ImageRepository(session), + custom_field_repo=CustomFieldRepository(session), + snapshot_repo=SnapshotRepository(session), + enrichment_repo=AiEnrichmentRepository(session), + data_source_repo=DataSourceRepository(session), + property_type_repo=PropertyTypeRepository(session), + normalizer=normalizer, + image_downloader=downloader, + image_analyzer=image_analyzer, + enricher=enricher, + active_jobs=active_jobs, + ) diff --git a/src/vmk_data_collector/services/queue_worker.py b/src/vmk_data_collector/services/queue_worker.py new file mode 100644 index 0000000..d41fafd --- /dev/null +++ b/src/vmk_data_collector/services/queue_worker.py @@ -0,0 +1,105 @@ +import asyncio +import contextlib +from collections.abc import Callable + +import structlog +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from vmk_data_collector.core.exceptions import OllamaRetryableError +from vmk_data_collector.db.repositories.raw_data import RawDataRepository +from vmk_data_collector.domain.enums import RawDataStatus +from vmk_data_collector.models.raw_parsing_data import RawParsingData +from vmk_data_collector.services.property_pipeline import PropertyPipeline + +logger = structlog.get_logger() + + +class QueueWorker: + """Background worker that polls the DB for pending raw-data jobs + and processes them one-by-one through PropertyPipeline. + """ + + def __init__( + self, + session_factory: async_sessionmaker, + pipeline_factory: Callable[[AsyncSession], PropertyPipeline], + poll_interval: float = 1.0, + stop_event: asyncio.Event | None = None, + ) -> None: + self._session_factory = session_factory + self._pipeline_factory = pipeline_factory + self._poll_interval = poll_interval + self._stop_event = stop_event or asyncio.Event() + + async def run(self) -> None: + logger.info("queue_worker_started", poll_interval=self._poll_interval) + while not self._stop_event.is_set(): + processed = await self._process_one() + if not processed: + with contextlib.suppress(TimeoutError): + await asyncio.wait_for( + self._stop_event.wait(), timeout=self._poll_interval + ) + logger.info("queue_worker_stopped") + + async def _process_one(self) -> bool: + async with self._session_factory() as session: + result = await session.execute( + select(RawParsingData) + .where(RawParsingData.status == RawDataStatus.pending) + .order_by(RawParsingData.id) + .limit(1) + .with_for_update(skip_locked=True) + ) + raw = result.scalar_one_or_none() + if raw is None: + return False + + raw.status = RawDataStatus.processing + await session.commit() + + pipeline = self._pipeline_factory(session) + try: + ingest_result = await pipeline.process(raw.id) + await session.commit() + if ingest_result.status == "completed": + logger.info("queue_job_completed", raw_id=raw.id) + elif ingest_result.status == "invalid": + logger.info( + "queue_job_invalid", + raw_id=raw.id, + reason=ingest_result.reason, + ) + else: + logger.warning( + "queue_job_failed", + raw_id=raw.id, + status=ingest_result.status, + reason=ingest_result.reason, + ) + return True + except OllamaRetryableError as exc: + logger.warning( + "queue_job_retryable", raw_id=raw.id, error=str(exc) + ) + await self._mark_failed(session, raw.id, f"Retryable: {exc}") + return True + except Exception as exc: + logger.error( + "queue_job_failed", + raw_id=raw.id, + error=str(exc), + exc_info=True, + ) + await self._mark_failed(session, raw.id, str(exc)) + return True + + async def _mark_failed( + self, session: AsyncSession, raw_id: int, message: str + ) -> None: + repo = RawDataRepository(session) + await repo.update_status( + raw_id, RawDataStatus.failed, error_message=message + ) + await session.commit()