diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..b40233c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,69 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# IDEs +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# Docker +Dockerfile +docker-compose*.yml +.dockerignore + +# Git +.git/ +.gitignore + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + +# Environment files (secrets) +.env +.env.* +!.env.example + +# Local data +vmk_postgres_data/ +images/ + +# Docs +docs/ +*.md diff --git a/.env.example b/.env.example index 3834ca2..dc33644 100644 --- a/.env.example +++ b/.env.example @@ -4,13 +4,13 @@ DATABASE_ECHO=false APP_HOST=0.0.0.0 -APP_PORT=8000 +APP_PORT=8020 LOG_LEVEL=info DEBUG=false -OLLAMA_BASE_URL=http://localhost:11434 -OLLAMA_TEXT_MODEL=llama3.2 -OLLAMA_VISION_MODEL=llava +OLLAMA_BASE_URL=http://192.168.1.75:11434 +OLLAMA_TEXT_MODEL=gemma4:e2b-it-q4_K_M +OLLAMA_VISION_MODEL=gemma4:e2b-it-q4_K_M OLLAMA_TIMEOUT=120 OLLAMA_MOCK=false diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..500291e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,32 @@ +# syntax=docker/dockerfile:1 +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +WORKDIR /app + +# Install system dependencies for Pillow and asyncpg +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + libjpeg-dev \ + zlib1g-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy dependency definitions first for layer caching +COPY pyproject.toml ./ +COPY src/ ./src/ +COPY alembic.ini ./ +COPY alembic/ ./alembic/ + +# Install package in editable mode +RUN pip install --no-cache-dir -e "." + +# Create image storage directory +RUN mkdir -p /var/lib/vmk/images + +EXPOSE 8000 + +# Run migrations then start the application +CMD ["sh", "-c", "alembic upgrade head && uvicorn vmk_data_collector.main:app --host 0.0.0.0 --port 8000"] diff --git a/alembic/env.py b/alembic/env.py index 6ff7064..fff165e 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,4 +1,5 @@ import asyncio +import os from logging.config import fileConfig from sqlalchemy import pool @@ -19,7 +20,7 @@ def run_migrations_offline() -> None: - url = config.get_main_option("sqlalchemy.url") + url = os.environ.get("DATABASE_URL") or config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, @@ -39,8 +40,11 @@ async def run_async_migrations() -> None: + section = config.get_section(config.config_ini_section, {}) + if database_url := os.environ.get("DATABASE_URL"): + section["sqlalchemy.url"] = database_url connectable = async_engine_from_config( - config.get_section(config.config_ini_section, {}), + section, prefix="sqlalchemy.", poolclass=pool.NullPool, ) diff --git a/docker-compose.yml b/docker-compose.yml index d315418..d5b83d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,7 +7,7 @@ POSTGRES_PASSWORD: postgres POSTGRES_DB: vmk_data ports: - - "5432:5432" + - "5433:5432" volumes: - vmk_postgres_data:/var/lib/postgresql/data healthcheck: @@ -16,5 +16,26 @@ timeout: 5s retries: 5 + app: + build: + context: . + dockerfile: Dockerfile + container_name: vmk_app + ports: + - "8020:8000" + env_file: + - .env + environment: + # Override hostnames for Docker networking + DATABASE_URL: postgresql+asyncpg://postgres:postgres@postgres:5432/vmk_data + # Ollama server on the local network + OLLAMA_BASE_URL: http://192.168.1.75:11434 + volumes: + - vmk_images:/var/lib/vmk/images + depends_on: + postgres: + condition: service_healthy + volumes: vmk_postgres_data: + vmk_images: diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py index 5d34cfe..7f0ad43 100644 --- a/src/vmk_data_collector/api/v1/router_properties.py +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -2,6 +2,7 @@ import httpx import pydantic +import structlog from fastapi import APIRouter, Depends, HTTPException, Request from sqlalchemy.ext.asyncio import AsyncSession @@ -18,6 +19,7 @@ ) router = APIRouter() +logger = structlog.get_logger() @router.post("/ingest", response_model=IngestResponse, status_code=202) @@ -27,9 +29,20 @@ ingest_request: RawDataIngestRequest, db: AsyncSession = Depends(get_db), ) -> IngestResponse: + logger.info( + "ingest_request", + source_slug=ingest_request.source_slug, + external_id=ingest_request.external_id, + ) try: validated_payload = PayloadSchema(**ingest_request.payload) except (pydantic.ValidationError, ValueError) as exc: + logger.warning( + "ingest_validation_failed", + source_slug=ingest_request.source_slug, + external_id=ingest_request.external_id, + error=str(exc), + ) raise ValidationError(f"Invalid payload: {exc}") from exc raw_repo = RawDataRepository(db) @@ -42,6 +55,12 @@ }, ) await db.commit() + logger.info( + "ingest_accepted", + job_id=raw.id, + source_slug=ingest_request.source_slug, + external_id=ingest_request.external_id, + ) return IngestResponse( job_id=raw.id, status="pending", @@ -54,11 +73,14 @@ listing_id: int, db: AsyncSession = Depends(get_db), ) -> dict[str, Any]: + logger.info("archive_check", listing_id=listing_id) repo = PropertyRepository(db) listing = await repo.get_by_id(listing_id) if not listing: + logger.warning("archive_check_listing_not_found", listing_id=listing_id) raise HTTPException(status_code=404, detail="Listing not found") if not listing.url_source: + logger.warning("archive_check_no_url", listing_id=listing_id) raise HTTPException(status_code=422, detail="Listing has no url_source") try: url = str(listing.url_source) @@ -66,14 +88,17 @@ async with httpx.AsyncClient(timeout=10) as client: response = await client.head(url) except Exception as exc: + logger.warning("archive_check_request_failed", listing_id=listing_id, error=str(exc)) return {"was_archived": False, "reason": f"request_failed: {exc}"} if response.status_code in (404, 410): await repo.mark_archived(listing_id) await db.commit() + logger.info("archive_check_archived", listing_id=listing_id, status=response.status_code) return { "was_archived": True, "reason": f"status_{response.status_code}", } + logger.info("archive_check_active", listing_id=listing_id, status=response.status_code) return { "was_archived": False, "reason": f"status_{response.status_code}", @@ -85,7 +110,9 @@ db: AsyncSession = Depends(get_db), days: int = 90, ) -> dict[str, int]: + logger.info("cleanup_raw_data_start", days=days) repo = RawDataRepository(db) deleted = await repo.delete_old_completed(days=days) await db.commit() + logger.info("cleanup_raw_data_done", days=days, deleted=deleted) return {"deleted_count": deleted} diff --git a/src/vmk_data_collector/core/config.py b/src/vmk_data_collector/core/config.py index b73f374..fa7a151 100644 --- a/src/vmk_data_collector/core/config.py +++ b/src/vmk_data_collector/core/config.py @@ -18,14 +18,14 @@ # App app_host: str = "0.0.0.0" - app_port: int = 8000 + app_port: int = 8020 log_level: str = "info" debug: bool = False # Ollama - ollama_base_url: str = "http://localhost:11434" - ollama_text_model: str = "llama3.2" - ollama_vision_model: str = "llava" + ollama_base_url: str = "http://192.168.1.75:11434" + ollama_text_model: str = "gemma4:e2b-it-q4_K_M" + ollama_vision_model: str = "gemma4:e2b-it-q4_K_M" ollama_timeout: int = 120 ollama_mock: bool = False diff --git a/src/vmk_data_collector/core/limiter.py b/src/vmk_data_collector/core/limiter.py index 6551d22..9799dad 100644 --- a/src/vmk_data_collector/core/limiter.py +++ b/src/vmk_data_collector/core/limiter.py @@ -2,12 +2,9 @@ from slowapi import Limiter -async def _source_slug_key(request: Request) -> str: - try: - body = await request.json() - return body.get("source_slug", "global") - except Exception: - return "global" +def _source_slug_key(request: Request) -> str: + """Sync key func for slowapi (async key_func is not awaited).""" + return request.client.host if request.client else "global" limiter = Limiter(key_func=_source_slug_key) diff --git a/src/vmk_data_collector/core/logging.py b/src/vmk_data_collector/core/logging.py index 54658e2..20e90f7 100644 --- a/src/vmk_data_collector/core/logging.py +++ b/src/vmk_data_collector/core/logging.py @@ -1,7 +1,17 @@ +import logging +import sys + import structlog def configure_logging(log_level: str, debug: bool = False) -> None: + # Configure standard library logging to output to stdout + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=getattr(logging, log_level.upper(), logging.INFO), + ) + shared_processors = [ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, diff --git a/src/vmk_data_collector/services/property_pipeline.py b/src/vmk_data_collector/services/property_pipeline.py index a3aa79c..9ffc7e3 100644 --- a/src/vmk_data_collector/services/property_pipeline.py +++ b/src/vmk_data_collector/services/property_pipeline.py @@ -90,10 +90,12 @@ self._active_jobs.add(raw_data_id) start = time.perf_counter() context = PipelineContext() + logger.info("pipeline_start", raw_id=raw_data_id) try: # Stage 1: Load raw data context.raw = await self._stage_load_raw(raw_data_id) + logger.info("pipeline_loaded", raw_id=raw_data_id) # Stage 2: Normalize via AI context.norm_response = await self._stage_normalize( @@ -101,6 +103,11 @@ ) if not context.norm_response.is_real_estate: pipeline_results_total.labels(status="invalid").inc() + logger.info( + "pipeline_not_real_estate", + raw_id=raw_data_id, + reason=context.norm_response.reason, + ) await self._raw_repo.update_status( raw_data_id, RawDataStatus.invalid, @@ -128,6 +135,12 @@ ) = await self._stage_resolve_source_and_listing( raw_data_id, context.raw, context.normalized ) + logger.info( + "pipeline_source_resolved", + raw_id=raw_data_id, + source_id=context.data_source.id, + existing_id=getattr(context.existing_listing, "id", None), + ) # Stage 4: Persist listing (create or update) ( @@ -140,6 +153,12 @@ context.data_source, context.existing_listing, ) + logger.info( + "pipeline_persisted", + raw_id=raw_data_id, + property_id=context.property_id, + snapshot_id=context.snapshot_id, + ) # Stage 5: Custom fields await self._stage_persist_custom_fields( @@ -152,6 +171,12 @@ context.property_id, context.normalized.images, ) + logger.info( + "pipeline_images_processed", + raw_id=raw_data_id, + property_id=context.property_id, + image_count=len(context.normalized.images or []), + ) # Stage 7: AI enrichment context.enrichment = await self._stage_enrich( @@ -159,6 +184,11 @@ context.normalized, context.aggregated_analysis, ) + logger.info( + "pipeline_enriched", + raw_id=raw_data_id, + property_id=context.property_id, + ) # Stage 8: Finalize result = await self._stage_finalize( @@ -167,6 +197,12 @@ context.snapshot_id, ) pipeline_results_total.labels(status="completed").inc() + logger.info( + "pipeline_completed", + raw_id=raw_data_id, + property_id=context.property_id, + duration=time.perf_counter() - start, + ) return result except CircuitBreakerOpenError as exc: