diff --git a/alembic/versions/2a9410d9738e_add_pgvector_embedding_column_and_hnsw_.py b/alembic/versions/2a9410d9738e_add_pgvector_embedding_column_and_hnsw_.py new file mode 100644 index 0000000..cff9109 --- /dev/null +++ b/alembic/versions/2a9410d9738e_add_pgvector_embedding_column_and_hnsw_.py @@ -0,0 +1,49 @@ +"""add pgvector embedding column and hnsw index + +Revision ID: 2a9410d9738e +Revises: 2ee315174452 +Create Date: 2026-06-12 19:40:22.923898 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '2a9410d9738e' +down_revision: Union[str, Sequence[str], None] = '2ee315174452' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Enable pgvector extension + op.execute("CREATE EXTENSION IF NOT EXISTS vector") + + # Add embedding column + op.execute( + "ALTER TABLE property_listings " + "ADD COLUMN IF NOT EXISTS embedding vector(768)" + ) + + # Create HNSW index for cosine similarity search + op.execute( + "CREATE INDEX IF NOT EXISTS ix_property_listings_embedding_hnsw " + "ON property_listings USING hnsw (embedding vector_cosine_ops)" + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.execute( + "DROP INDEX IF EXISTS ix_property_listings_embedding_hnsw" + ) + op.execute( + "ALTER TABLE property_listings " + "DROP COLUMN IF EXISTS embedding" + ) + # We intentionally leave the 'vector' extension in place + # in case other schemas depend on it. diff --git a/docker-compose.yml b/docker-compose.yml index d5b83d0..7c7820a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: postgres: - image: postgres:16-alpine + image: pgvector/pgvector:pg16 container_name: vmk_postgres environment: POSTGRES_USER: postgres diff --git a/pyproject.toml b/pyproject.toml index ed74a75..f94580f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ "prometheus-client>=0.20.0", "prometheus-fastapi-instrumentator>=7.0.0", "python-multipart>=0.0.9", + "pgvector>=0.3.0", ] [project.optional-dependencies] diff --git a/scripts/backfill_embeddings.py b/scripts/backfill_embeddings.py new file mode 100644 index 0000000..f127874 --- /dev/null +++ b/scripts/backfill_embeddings.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +"""Backfill embeddings for existing property_listings. + +Usage: + python -m scripts.backfill_embeddings --batch-size 10 --limit 1000 +""" + +import argparse +import asyncio + +import structlog +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from vmk_data_collector.core.config import settings +from vmk_data_collector.db.repositories.property import PropertyRepository +from vmk_data_collector.models.property_listing import PropertyListing +from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.property_pipeline import PropertyPipeline + +logger = structlog.get_logger() + + +def _build_embedding_text(listing: PropertyListing) -> str: + """Reuses pipeline text builder logic.""" + parts: list[str] = [] + + if listing.deal_type: + parts.append(f"{listing.deal_type}") + if listing.title: + parts.append(f"{listing.title}") + if listing.description: + parts.append(f"{listing.description}") + if listing.generated_description: + parts.append(f"{listing.generated_description}") + + location_parts: list[str] = [] + if listing.city: + location_parts.append(f"город {listing.city}") + if listing.district: + location_parts.append(f"район {listing.district}") + if listing.micro_district: + location_parts.append(f"микрорайон {listing.micro_district}") + if listing.street: + location_parts.append(f"улица {listing.street}") + if location_parts: + parts.append(", ".join(location_parts)) + + if listing.rooms_count is not None: + parts.append(f"{listing.rooms_count} комнат") + if listing.total_area: + parts.append(f"площадь {listing.total_area} м²") + if listing.floor and listing.floors_total: + parts.append(f"этаж {listing.floor} из {listing.floors_total}") + elif listing.floor: + parts.append(f"этаж {listing.floor}") + if listing.building_type: + parts.append(f"тип дома {listing.building_type}") + if listing.renovation_status: + parts.append(f"ремонт {listing.renovation_status}") + if listing.price: + currency = listing.currency or "" + parts.append(f"цена {listing.price} {currency}") + + return ". ".join(parts) + + +async def backfill( + batch_size: int = 10, + limit: int | None = None, +) -> None: + engine = create_async_engine(settings.database_url_async, echo=False) + async_session = async_sessionmaker(engine, expire_on_commit=False) + client = OllamaClient(base_url=settings.ollama_base_url) + + async with async_session() as session: + from sqlalchemy import select + + stmt = select(PropertyListing).where( + PropertyListing.embedding.is_(None) + ) + if limit: + stmt = stmt.limit(limit) + + result = await session.execute(stmt) + listings = result.scalars().all() + total = len(listings) + logger.info("backfill_start", total=total, batch_size=batch_size) + + processed = 0 + for i in range(0, total, batch_size): + batch = listings[i : i + batch_size] + texts = [] + for listing in batch: + text = _build_embedding_text(listing) + texts.append(text) + + try: + embeddings = await client.embed( + model=settings.ollama_embedding_model, + texts=texts, + ) + except Exception as exc: + logger.error( + "backfill_batch_failed", + batch_start=i, + error=str(exc), + ) + continue + + repo = PropertyRepository(session) + for listing, vector in zip(batch, embeddings): + if vector: + await repo.update_embedding(listing.id, vector) + processed += 1 + + await session.commit() + logger.info( + "backfill_batch_done", + processed=processed, + total=total, + batch_start=i, + batch_end=min(i + batch_size, total), + ) + + await client.close() + await engine.dispose() + logger.info("backfill_complete", processed=processed, total=total) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Backfill embeddings for existing listings" + ) + parser.add_argument( + "--batch-size", + type=int, + default=10, + help="How many listings to embed per Ollama request", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Maximum listings to process (default: all)", + ) + args = parser.parse_args() + asyncio.run(backfill(batch_size=args.batch_size, limit=args.limit)) diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py index 0fbf96b..0c4237a 100644 --- a/src/vmk_data_collector/api/v1/router_properties.py +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -20,6 +20,10 @@ PayloadSchema, RawDataIngestRequest, ) +from vmk_data_collector.schemas.search import ( + SimilarSearchRequest, + SimilarSearchResponse, +) from vmk_data_collector.db.session import AsyncSessionLocal from vmk_data_collector.services.ollama_client import OllamaClient from vmk_data_collector.services.pipeline_factory import build_pipeline @@ -208,6 +212,72 @@ return result +@router.post("/search/similar", response_model=SimilarSearchResponse) +@limiter.limit("60/minute") +async def search_similar( + request: Request, + search_request: SimilarSearchRequest, + db: AsyncSession = Depends(get_db), + client: OllamaClient = Depends(get_ollama_client), +) -> SimilarSearchResponse: + """Semantic vector search over property listings. + + Converts the query text into an embedding via Ollama and returns + the most similar listings using cosine similarity in PostgreSQL (pgvector). + """ + logger.info( + "search_similar_request", + query=search_request.query, + limit=search_request.limit, + deal_type=str(search_request.deal_type) if search_request.deal_type else None, + city=search_request.city, + ) + + # 1. Generate query embedding + try: + embeddings = await client.embed( + model=settings.ollama_embedding_model, + texts=[search_request.query], + ) + except Exception as exc: + logger.error("search_similar_embedding_failed", error=str(exc)) + raise HTTPException( + status_code=503, + detail=f"Embedding generation failed: {exc}", + ) from exc + + if not embeddings or not embeddings[0]: + raise HTTPException( + status_code=503, + detail="Empty embedding returned by model", + ) + + query_vector = embeddings[0] + + # 2. Search in DB + repo = PropertyRepository(db) + results = await repo.search_similar( + query_vector=query_vector, + limit=search_request.limit, + deal_type=search_request.deal_type, + city=search_request.city, + min_price=search_request.min_price, + max_price=search_request.max_price, + ) + + logger.info( + "search_similar_done", + query=search_request.query, + result_count=len(results), + ) + + return SimilarSearchResponse( + query=search_request.query, + results=results, + total=len(results), + ) + + @router.post("/listings/{listing_id}/archive-check") async def archive_check_listing( listing_id: int, diff --git a/src/vmk_data_collector/core/config.py b/src/vmk_data_collector/core/config.py index fa7a151..03918ad 100644 --- a/src/vmk_data_collector/core/config.py +++ b/src/vmk_data_collector/core/config.py @@ -26,6 +26,7 @@ ollama_base_url: str = "http://192.168.1.75:11434" ollama_text_model: str = "gemma4:e2b-it-q4_K_M" ollama_vision_model: str = "gemma4:e2b-it-q4_K_M" + ollama_embedding_model: str = "nomic-embed-text:latest" ollama_timeout: int = 120 ollama_mock: bool = False diff --git a/src/vmk_data_collector/db/repositories/property.py b/src/vmk_data_collector/db/repositories/property.py index 4aaa348..94c008a 100644 --- a/src/vmk_data_collector/db/repositories/property.py +++ b/src/vmk_data_collector/db/repositories/property.py @@ -1,10 +1,11 @@ from datetime import UTC, datetime +from typing import Any -from sqlalchemy import delete, select +from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession from vmk_data_collector.db.repositories.base import BaseRepository -from vmk_data_collector.domain.enums import ListingStatus +from vmk_data_collector.domain.enums import DealType, ListingStatus from vmk_data_collector.models.property_custom_field import PropertyCustomField from vmk_data_collector.models.property_listing import PropertyListing @@ -47,6 +48,87 @@ setattr(obj, key, value) return obj + async def update_embedding( + self, property_id: int, vector: list[float] + ) -> PropertyListing: + result = await self._session.execute( + select(PropertyListing).where(PropertyListing.id == property_id) + ) + obj = result.scalar_one_or_none() + if obj is None: + raise ValueError(f"PropertyListing {property_id} not found") + obj.embedding = vector + return obj + + async def search_similar( + self, + query_vector: list[float], + limit: int = 10, + deal_type: DealType | None = None, + city: str | None = None, + min_price: float | None = None, + max_price: float | None = None, + ) -> list[dict[str, Any]]: + """Search listings by cosine similarity to a query vector. + + Returns rows sorted by similarity (highest first) with a + ``similarity_score`` field (0–1, where 1 is identical). + """ + distance_expr = PropertyListing.embedding.cosine_distance(query_vector) + + stmt = ( + select( + PropertyListing, + (1 - distance_expr).label("similarity_score"), + ) + .where(PropertyListing.embedding.is_not(None)) + .where(PropertyListing.listing_status == ListingStatus.active) + ) + + if deal_type is not None: + stmt = stmt.where(PropertyListing.deal_type == deal_type) + if city is not None: + stmt = stmt.where( + func.lower(PropertyListing.city) == city.lower() + ) + if min_price is not None: + stmt = stmt.where(PropertyListing.price >= min_price) + if max_price is not None: + stmt = stmt.where(PropertyListing.price <= max_price) + + stmt = ( + stmt.order_by(distance_expr.asc()) + .limit(limit) + ) + + result = await self._session.execute(stmt) + rows = result.all() + + listings: list[dict[str, Any]] = [] + for row in rows: + listing = row[0] + similarity = row[1] + listings.append( + { + "id": listing.id, + "title": listing.title, + "description": listing.description, + "generated_description": listing.generated_description, + "deal_type": listing.deal_type, + "price": float(listing.price) if listing.price is not None else None, + "currency": listing.currency, + "city": listing.city, + "district": listing.district, + "rooms_count": listing.rooms_count, + "total_area": float(listing.total_area) if listing.total_area is not None else None, + "url_source": listing.url_source, + "images_count": listing.images_count, + "similarity_score": round(float(similarity), 4) if similarity is not None else None, + "created_at": listing.created_at.isoformat() if listing.created_at else None, + } + ) + return listings + async def delete_custom_fields(self, property_id: int) -> None: await self._session.execute( delete(PropertyCustomField).where( diff --git a/src/vmk_data_collector/models/property_listing.py b/src/vmk_data_collector/models/property_listing.py index b7e9f06..20707e0 100644 --- a/src/vmk_data_collector/models/property_listing.py +++ b/src/vmk_data_collector/models/property_listing.py @@ -1,5 +1,6 @@ from datetime import datetime +from pgvector.sqlalchemy import Vector from sqlalchemy import ( TIMESTAMP, Boolean, @@ -124,6 +125,7 @@ listing_quality_score: Mapped[int | None] = mapped_column(SmallInteger) reliability_rating: Mapped[int | None] = mapped_column(SmallInteger) sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2)) + embedding: Mapped[list[float] | None] = mapped_column(Vector(768)) created_at: Mapped[datetime] = mapped_column( TIMESTAMP(timezone=True), server_default=func.now() diff --git a/src/vmk_data_collector/schemas/search.py b/src/vmk_data_collector/schemas/search.py new file mode 100644 index 0000000..790bd60 --- /dev/null +++ b/src/vmk_data_collector/schemas/search.py @@ -0,0 +1,44 @@ +from typing import Any + +from pydantic import BaseModel, Field + +from vmk_data_collector.domain.enums import DealType + + +class SimilarSearchRequest(BaseModel): + """Semantic search by text query (vector similarity).""" + + query: str = Field(..., min_length=1, max_length=2000) + limit: int = Field(default=10, ge=1, le=100) + deal_type: DealType | None = None + city: str | None = Field(default=None, max_length=128) + min_price: float | None = Field(default=None, ge=0) + max_price: float | None = Field(default=None, ge=0) + + +class SimilarSearchResult(BaseModel): + """One listing returned by semantic search.""" + + id: int + title: str | None = None + description: str | None = None + generated_description: str | None = None + deal_type: DealType | None = None + price: float | None = None + currency: str | None = None + city: str | None = None + district: str | None = None + rooms_count: int | None = None + total_area: float | None = None + url_source: str | None = None + images_count: int | None = None + similarity_score: float + created_at: str | None = None + + +class SimilarSearchResponse(BaseModel): + """Response wrapper for semantic search.""" + + query: str + results: list[SimilarSearchResult] + total: int diff --git a/src/vmk_data_collector/services/ollama_client.py b/src/vmk_data_collector/services/ollama_client.py index fc953a4..fcd9691 100644 --- a/src/vmk_data_collector/services/ollama_client.py +++ b/src/vmk_data_collector/services/ollama_client.py @@ -110,6 +110,64 @@ logger.info("ollama_chat_response", model=model) return data + async def embed( + self, + model: str, + texts: list[str], + ) -> list[list[float]]: + try: + async for attempt in AsyncRetrying(**_RETRY_CONFIG): + with attempt: + result = await self._circuit_breaker.call( + self._embed_raw, + model, + texts, + ) + ai_requests_total.labels(model=model, status="success").inc() + return result + except Exception: + ai_requests_total.labels(model=model, status="error").inc() + raise + + async def _embed_raw( + self, + model: str, + texts: list[str], + ) -> list[list[float]]: + payload = { + "model": model, + "input": texts, + } + logger.info( + "ollama_embed_request", + model=model, + text_count=len(texts), + ) + try: + response = await self._client.post("/api/embed", json=payload) + response.raise_for_status() + data = response.json() + except httpx.ConnectError as exc: + raise OllamaRetryableError(f"Connection error: {exc}") from exc + except httpx.TimeoutException as exc: + raise OllamaRetryableError(f"Timeout: {exc}") from exc + except httpx.HTTPStatusError as exc: + raise _classify_httpx_error(exc) from exc + except json.JSONDecodeError as exc: + raise OllamaFatalError(f"Invalid JSON response: {exc}") from exc + + embeddings = data.get("embeddings") + if not embeddings: + raise OllamaFatalError( + f"Missing embeddings in response: {data.keys()}" + ) + logger.info( + "ollama_embed_response", + model=model, + embedding_count=len(embeddings), + ) + return embeddings + async def chat_with_images( self, model: str, diff --git a/src/vmk_data_collector/services/pipeline_factory.py b/src/vmk_data_collector/services/pipeline_factory.py index 52762aa..0f12960 100644 --- a/src/vmk_data_collector/services/pipeline_factory.py +++ b/src/vmk_data_collector/services/pipeline_factory.py @@ -51,5 +51,6 @@ image_downloader=downloader, image_analyzer=image_analyzer, enricher=enricher, + ollama_client=ollama_client, active_jobs=active_jobs, ) diff --git a/src/vmk_data_collector/services/property_pipeline.py b/src/vmk_data_collector/services/property_pipeline.py index 8ea38bd..fe021c5 100644 --- a/src/vmk_data_collector/services/property_pipeline.py +++ b/src/vmk_data_collector/services/property_pipeline.py @@ -70,6 +70,7 @@ image_downloader: ImageDownloader, image_analyzer: AiImageAnalyzer, enricher: AiEnricher, + ollama_client: OllamaClient, active_jobs: set[int] | None = None, ) -> None: self._raw_repo = raw_repo @@ -85,6 +86,7 @@ self._image_downloader = image_downloader self._image_analyzer = image_analyzer self._enricher = enricher + self._ollama_client = ollama_client async def process( self, raw_data_id: int, uploaded_image_paths: list[str] | None = None @@ -223,7 +225,18 @@ property_id=context.property_id, ) - # Stage 8: Finalize + # Stage 8: Generate embedding + await self._stage_embed( + context.property_id, + context.normalized, + ) + logger.info( + "pipeline_embedding_generated", + raw_id=raw_data_id, + property_id=context.property_id, + ) + + # Stage 9: Finalize result = await self._stage_finalize( raw_data_id, context.property_id, @@ -589,6 +602,56 @@ return enrichment + async def _stage_embed( + self, + property_id: int, + normalized: NormalizedProperty, + ) -> None: + from vmk_data_collector.core.config import settings + + text = self._build_embedding_text(normalized) + if not text.strip(): + logger.warning( + "pipeline_embedding_empty_text", + property_id=property_id, + ) + return + + try: + embeddings = await self._ollama_client.embed( + model=settings.ollama_embedding_model, + texts=[text], + ) + except CircuitBreakerOpenError: + logger.warning( + "pipeline_embedding_circuit_breaker_open", + property_id=property_id, + ) + return + except OllamaRetryableError: + logger.warning( + "pipeline_embedding_retryable_error", + property_id=property_id, + ) + return + except Exception as exc: + logger.error( + "pipeline_embedding_error", + property_id=property_id, + error=str(exc), + ) + return + + if embeddings and embeddings[0]: + await self._property_repo.update_embedding( + property_id, embeddings[0] + ) + logger.info( + "pipeline_embedding_saved", + property_id=property_id, + vector_dim=len(embeddings[0]), + ) + async def _stage_finalize( self, raw_data_id: int, @@ -768,6 +831,52 @@ } @staticmethod + def _build_embedding_text( + normalized: NormalizedProperty, + ) -> str: + """Build a dense description for vector embedding generation.""" + parts: list[str] = [] + + if normalized.deal_type: + parts.append(f"{normalized.deal_type}") + if normalized.title: + parts.append(f"{normalized.title}") + if normalized.description: + parts.append(f"{normalized.description}") + if normalized.generated_description: + parts.append(f"{normalized.generated_description}") + + location_parts: list[str] = [] + if normalized.city: + location_parts.append(f"город {normalized.city}") + if normalized.district: + location_parts.append(f"район {normalized.district}") + if normalized.micro_district: + location_parts.append(f"микрорайон {normalized.micro_district}") + if normalized.street: + location_parts.append(f"улица {normalized.street}") + if location_parts: + parts.append(", ".join(location_parts)) + + if normalized.rooms_count is not None: + parts.append(f"{normalized.rooms_count} комнат") + if normalized.total_area: + parts.append(f"площадь {normalized.total_area} м²") + if normalized.floor and normalized.floors_total: + parts.append(f"этаж {normalized.floor} из {normalized.floors_total}") + elif normalized.floor: + parts.append(f"этаж {normalized.floor}") + if normalized.building_type: + parts.append(f"тип дома {normalized.building_type}") + if normalized.renovation_status: + parts.append(f"ремонт {normalized.renovation_status}") + if normalized.price: + currency = normalized.currency or "" + parts.append(f"цена {normalized.price} {currency}") + + return ". ".join(parts) + + @staticmethod def _to_enum(field: str, value: Any) -> Any: if value is None or not isinstance(value, str): return value