import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
import structlog
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from prometheus_fastapi_instrumentator import Instrumentator
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from vmk_data_collector.api.v1.router_health import router as health_router
from vmk_data_collector.api.v1.router_properties import (
router as properties_router,
)
from vmk_data_collector.core.config import settings
from vmk_data_collector.core.exceptions import (
AIProcessingError,
AppError,
NotRealEstateError,
ValidationError,
)
from vmk_data_collector.core.limiter import limiter
from vmk_data_collector.core.logging import configure_logging
from vmk_data_collector.services.ollama_client import OllamaClient
async def _wait_active_jobs(active_jobs: set[int]) -> None:
while active_jobs:
await asyncio.sleep(0.5)
@asynccontextmanager
async def lifespan(app: FastAPI):
configure_logging(settings.log_level, debug=settings.debug)
Path(settings.image_storage_path).mkdir(parents=True, exist_ok=True)
app.state.active_jobs: set[int] = set()
app.state.ollama_client = OllamaClient(
base_url=settings.ollama_base_url,
timeout=settings.ollama_timeout,
)
yield
active_jobs: set[int] = getattr(app.state, "active_jobs", set())
if active_jobs:
logger = structlog.get_logger()
logger.info("waiting_for_active_jobs", count=len(active_jobs))
try:
await asyncio.wait_for(
_wait_active_jobs(active_jobs), timeout=30
)
except TimeoutError:
logger.warning(
"shutdown_timeout_reached", active=len(active_jobs)
)
await app.state.ollama_client.close()
app = FastAPI(
title="VMK Data Collector",
version="0.1.0",
lifespan=lifespan,
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
Instrumentator().instrument(app).expose(app)
app.include_router(health_router, prefix="/api/v1")
app.include_router(properties_router, prefix="/api/v1")
@app.exception_handler(AppError)
async def app_error_handler(request, exc: AppError):
return JSONResponse(
status_code=500,
content={"detail": exc.message},
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={"detail": exc.message},
)
@app.exception_handler(NotRealEstateError)
async def not_real_estate_handler(request, exc: NotRealEstateError):
return JSONResponse(
status_code=202,
content={"status": "invalid", "reason": exc.message},
)
@app.exception_handler(AIProcessingError)
async def ai_processing_error_handler(request, exc: AIProcessingError):
return JSONResponse(
status_code=202,
content={"status": "failed", "reason": exc.message},
)