import asyncio
import contextlib
from contextlib import asynccontextmanager
from pathlib import Path
import structlog
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from prometheus_fastapi_instrumentator import Instrumentator
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from vmk_data_collector.api.v1.router_health import router as health_router
from vmk_data_collector.api.v1.router_properties import (
router as properties_router,
)
from vmk_data_collector.core.config import settings
from vmk_data_collector.core.exceptions import (
AIProcessingError,
AppError,
NotRealEstateError,
ValidationError,
)
from vmk_data_collector.core.limiter import limiter
from vmk_data_collector.core.logging import configure_logging
from vmk_data_collector.db.session import AsyncSessionLocal
from vmk_data_collector.services.ollama_client import OllamaClient
from vmk_data_collector.services.pipeline_factory import build_pipeline
from vmk_data_collector.services.queue_worker import QueueWorker
logger = structlog.get_logger()
async def _wait_active_jobs(active_jobs: set[int]) -> None:
while active_jobs:
await asyncio.sleep(0.5)
@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging(settings.log_level, debug=settings.debug)
Path(settings.image_storage_path).mkdir(parents=True, exist_ok=True)
app.state.active_jobs: set[int] = set()
app.state.ollama_client = OllamaClient(
base_url=settings.ollama_base_url,
timeout=settings.ollama_timeout,
)
stop_event = asyncio.Event()
worker = QueueWorker(
session_factory=AsyncSessionLocal,
pipeline_factory=lambda session: build_pipeline(
session,
ollama_client=app.state.ollama_client,
active_jobs=app.state.active_jobs,
),
poll_interval=1.0,
stop_event=stop_event,
)
worker_task = asyncio.create_task(worker.run())
yield
# Shutdown: stop worker first, then wait for active jobs, then close client
stop_event.set()
try:
await asyncio.wait_for(worker_task, timeout=60)
except TimeoutError:
logger.warning("worker_shutdown_timeout")
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
active_jobs: set[int] = getattr(app.state, "active_jobs", set())
if active_jobs:
logger.info("waiting_for_active_jobs", count=len(active_jobs))
try:
await asyncio.wait_for(
_wait_active_jobs(active_jobs), timeout=30
)
except TimeoutError:
logger.warning(
"shutdown_timeout_reached", active=len(active_jobs)
)
await app.state.ollama_client.close()
app = FastAPI(
title="VMK Data Collector",
version="0.1.0",
lifespan=lifespan,
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
Instrumentator().instrument(app).expose(app)
app.include_router(health_router, prefix="/api/v1")
app.include_router(properties_router, prefix="/api/v1")
@app.exception_handler(AppError)
async def app_error_handler(request, exc: AppError):
return JSONResponse(
status_code=500,
content={"detail": exc.message},
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={"detail": exc.message},
)
@app.exception_handler(NotRealEstateError)
async def not_real_estate_handler(request, exc: NotRealEstateError):
return JSONResponse(
status_code=202,
content={"status": "invalid", "reason": exc.message},
)
@app.exception_handler(AIProcessingError)
async def ai_processing_error_handler(request, exc: AIProcessingError):
return JSONResponse(
status_code=202,
content={"status": "failed", "reason": exc.message},
)