diff --git a/.plan.md b/.plan.md new file mode 100644 index 0000000..3e6e283 --- /dev/null +++ b/.plan.md @@ -0,0 +1,62 @@ +# Plan: Multipart Image Ingest (Вариант 1) + +## Goal +Add a new endpoint `POST /api/v1/ingest/with-images` that accepts: +- `metadata` — JSON string with `source_slug`, `external_id`, `payload` +- `images` — 0–N binary image files via `multipart/form-data` + +The parser downloads images from its own source and uploads them directly to our service. We never fight foreign CDNs again. + +## Architecture + +### 1. Endpoint (`router_properties.py`) +- `metadata: str = Form(...)` — validated as JSON, must contain `source_slug` + `payload` with `title|description` +- `images: list[UploadFile] = File(default=[])` — streamed to disk, not held in memory +- Flow: + 1. Parse & validate metadata + 2. Save `raw_parsing_data` (status = pending) + 3. Stream each `UploadFile` to `/var/lib/vmk/images/temp/{raw_id}/{idx}{ext}` + 4. Inject `_uploaded_image_paths` into `raw.payload` + 5. **Inline `await pipeline.process(raw.id)`** — synchronous, because images are already local and we want immediate result + 6. Return `IngestResponse` + +### 2. Pipeline (`property_pipeline.py`) +- `_stage_process_images` checks `raw.payload.get("_uploaded_image_paths")` + - If present → `_stage_process_uploaded_images(property_id, paths)` + - If absent → `_stage_process_remote_images(property_id, urls)` (existing behaviour) +- New helper `_process_uploaded_one`: + - Reads local file → SHA256, width, height via Pillow + - Moves file from `temp/{raw_id}/` to permanent `/{property_id}/{hash}.{ext}` + - Creates `PropertyImage` row with `downloaded` status + - Runs AI image analysis via `OllamaClient.image_to_base64` +- On successful completion: cleans up temp dir for this raw_id +- On failure: leaves temp dir for inspection (cleanup later) + +### 3. Image Processing Helper (`image_downloader.py`) +- New method `ImageDownloader.process_local_file(property_id, temp_path, order)`: + - Mirrors `download()` return type (`PropertyImageDownloadResult`) + - No HTTP, just filesystem + Pillow + +### 4. Limits & Validation +- Max files per request: 50 (configurable) +- Max file size: 10 MB each (configurable) +- FastAPI `UploadFile` already spills large files to disk — we just copy. + +### 5. README +- Add `curl -F` example +- Add Python `requests` multipart example +- Explain when to use `/ingest` (URLs) vs `/ingest/with-images` (binary) + +## Files to modify +1. `src/vmk_data_collector/api/v1/router_properties.py` +2. `src/vmk_data_collector/services/property_pipeline.py` +3. `src/vmk_data_collector/services/image_downloader.py` +4. `README.md` + +## Why inline pipeline instead of queue? +- Parser already spent resources downloading images; we should not leave them in temp for an unknown queue delay. +- Immediate feedback: parser gets `property_id`, `snapshot_id`, validation result right away. +- Simpler state management — no orphaned temp files. + +## Why not base64 in JSON? +- 33% overhead, huge JSON payloads, harder to debug, timeouts. Multipart is the industry standard for file uploads. diff --git a/alembic.ini b/alembic.ini index 0a434c1..b37216d 100644 --- a/alembic.ini +++ b/alembic.ini @@ -86,7 +86,7 @@ # database URL. This is consumed by the user-maintained env.py script only. # other means of configuring database URLs may be customized within the env.py # file. -sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data +sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5433/vmk_data [post_write_hooks] diff --git a/alembic/versions/5e1da0609f70_add_fts_search_vector_column_and_gin_.py b/alembic/versions/5e1da0609f70_add_fts_search_vector_column_and_gin_.py new file mode 100644 index 0000000..9614353 --- /dev/null +++ b/alembic/versions/5e1da0609f70_add_fts_search_vector_column_and_gin_.py @@ -0,0 +1,73 @@ +"""add fts search_vector column and gin index + +Revision ID: 5e1da0609f70 +Revises: 2a9410d9738e +Create Date: 2026-06-12 21:19:12.451473 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '5e1da0609f70' +down_revision: Union[str, Sequence[str], None] = '2a9410d9738e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # Create an immutable wrapper so we can use it in a generated column. + op.execute( + """ + CREATE OR REPLACE FUNCTION to_tsvector_simple(text) + RETURNS tsvector + LANGUAGE sql IMMUTABLE PARALLEL SAFE + AS $$ SELECT to_tsvector('simple', $1); $$; + """ + ) + + # Add generated tsvector column (simple config for mixed ru/ua text) + # Note: enum casts (::text) are STABLE, so we only index text columns. + op.execute( + """ + ALTER TABLE property_listings + ADD COLUMN IF NOT EXISTS search_vector tsvector + GENERATED ALWAYS AS ( + to_tsvector_simple( + coalesce(title, '') || ' ' || + coalesce(description, '') || ' ' || + coalesce(generated_description, '') || ' ' || + coalesce(city, '') || ' ' || + coalesce(district, '') || ' ' || + coalesce(micro_district, '') || ' ' || + coalesce(street, '') || ' ' || + coalesce(address_raw, '') + ) + ) STORED + """ + ) + + # Create GIN index for fast full-text search + op.execute( + """ + CREATE INDEX IF NOT EXISTS ix_property_listings_search_vector_gin + ON property_listings USING GIN(search_vector) + """ + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.execute( + "DROP INDEX IF EXISTS ix_property_listings_search_vector_gin" + ) + op.execute( + "ALTER TABLE property_listings DROP COLUMN IF EXISTS search_vector" + ) + op.execute( + "DROP FUNCTION IF EXISTS to_tsvector_simple(text)" + ) diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py index 0c4237a..07bdc65 100644 --- a/src/vmk_data_collector/api/v1/router_properties.py +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -21,6 +21,8 @@ RawDataIngestRequest, ) from vmk_data_collector.schemas.search import ( + FulltextSearchRequest, + FulltextSearchResponse, SimilarSearchRequest, SimilarSearchResponse, ) @@ -278,6 +280,52 @@ ) +@router.post("/search/fulltext", response_model=FulltextSearchResponse) +@limiter.limit("120/minute") +async def search_fulltext( + request: Request, + search_request: FulltextSearchRequest, + db: AsyncSession = Depends(get_db), +) -> FulltextSearchResponse: + """Full-text (keyword) search over property listings. + + Uses PostgreSQL tsvector with the ``simple`` text-search configuration, + which is language-agnostic and works well for mixed ru/ua content. + Results are ranked by relevance. + """ + logger.info( + "search_fulltext_request", + query=search_request.query, + limit=search_request.limit, + deal_type=str(search_request.deal_type) + if search_request.deal_type + else None, + city=search_request.city, + ) + + repo = PropertyRepository(db) + results = await repo.search_fulltext( + query=search_request.query, + limit=search_request.limit, + deal_type=search_request.deal_type, + city=search_request.city, + min_price=search_request.min_price, + max_price=search_request.max_price, + ) + + logger.info( + "search_fulltext_done", + query=search_request.query, + result_count=len(results), + ) + + return FulltextSearchResponse( + query=search_request.query, + results=results, + total=len(results), + ) + + @router.post("/listings/{listing_id}/archive-check") async def archive_check_listing( listing_id: int, diff --git a/src/vmk_data_collector/db/repositories/property.py b/src/vmk_data_collector/db/repositories/property.py index 94c008a..6634f2a 100644 --- a/src/vmk_data_collector/db/repositories/property.py +++ b/src/vmk_data_collector/db/repositories/property.py @@ -129,6 +129,81 @@ ) return listings + async def search_fulltext( + self, + query: str, + limit: int = 10, + deal_type: DealType | None = None, + city: str | None = None, + min_price: float | None = None, + max_price: float | None = None, + ) -> list[dict[str, Any]]: + """Full-text search over listings using PostgreSQL tsvector. + + Uses the ``simple`` text-search configuration (language-agnostic + tokenisation) so mixed ru/ua text is handled gracefully. + Results are ordered by relevance (``ts_rank_cd``) descending. + """ + ts_query = func.plainto_tsquery("simple", query) + rank_expr = func.ts_rank_cd( + PropertyListing.search_vector, ts_query, 32 + ) # 32 = normalisation for document length + + stmt = ( + select(PropertyListing, rank_expr.label("rank_score")) + .where(PropertyListing.search_vector.op("@@")(ts_query)) + .where(PropertyListing.listing_status == ListingStatus.active) + ) + + if deal_type is not None: + stmt = stmt.where(PropertyListing.deal_type == deal_type) + if city is not None: + stmt = stmt.where( + func.lower(PropertyListing.city) == city.lower() + ) + if min_price is not None: + stmt = stmt.where(PropertyListing.price >= min_price) + if max_price is not None: + stmt = stmt.where(PropertyListing.price <= max_price) + + stmt = stmt.order_by(rank_expr.desc()).limit(limit) + + result = await self._session.execute(stmt) + rows = result.all() + + listings: list[dict[str, Any]] = [] + for row in rows: + listing = row[0] + rank = row[1] + listings.append( + { + "id": listing.id, + "title": listing.title, + "description": listing.description, + "generated_description": listing.generated_description, + "deal_type": listing.deal_type, + "price": float(listing.price) + if listing.price is not None + else None, + "currency": listing.currency, + "city": listing.city, + "district": listing.district, + "rooms_count": listing.rooms_count, + "total_area": float(listing.total_area) + if listing.total_area is not None + else None, + "url_source": listing.url_source, + "images_count": listing.images_count, + "rank_score": round(float(rank), 4) + if rank is not None + else None, + "created_at": listing.created_at.isoformat() + if listing.created_at + else None, + } + ) + return listings + async def delete_custom_fields(self, property_id: int) -> None: await self._session.execute( delete(PropertyCustomField).where( diff --git a/src/vmk_data_collector/models/property_listing.py b/src/vmk_data_collector/models/property_listing.py index 20707e0..74282e4 100644 --- a/src/vmk_data_collector/models/property_listing.py +++ b/src/vmk_data_collector/models/property_listing.py @@ -4,6 +4,7 @@ from sqlalchemy import ( TIMESTAMP, Boolean, + Computed, ForeignKey, Index, Integer, @@ -14,6 +15,7 @@ UniqueConstraint, func, ) +from sqlalchemy.dialects.postgresql import TSVECTOR from sqlalchemy.orm import Mapped, mapped_column from vmk_data_collector.db.base import Base @@ -126,6 +128,22 @@ reliability_rating: Mapped[int | None] = mapped_column(SmallInteger) sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2)) embedding: Mapped[list[float] | None] = mapped_column(Vector(768)) + search_vector: Mapped[TSVECTOR | None] = mapped_column( + TSVECTOR, + Computed( + "to_tsvector_simple(" + "coalesce(title, '') || ' ' || " + "coalesce(description, '') || ' ' || " + "coalesce(generated_description, '') || ' ' || " + "coalesce(city, '') || ' ' || " + "coalesce(district, '') || ' ' || " + "coalesce(micro_district, '') || ' ' || " + "coalesce(street, '') || ' ' || " + "coalesce(address_raw, '')" + ")", + persisted=True, + ), + ) created_at: Mapped[datetime] = mapped_column( TIMESTAMP(timezone=True), server_default=func.now() diff --git a/src/vmk_data_collector/schemas/search.py b/src/vmk_data_collector/schemas/search.py index 790bd60..c6d6b8b 100644 --- a/src/vmk_data_collector/schemas/search.py +++ b/src/vmk_data_collector/schemas/search.py @@ -42,3 +42,42 @@ query: str results: list[SimilarSearchResult] total: int + + +class FulltextSearchRequest(BaseModel): + """Full-text search by keyword query (PostgreSQL tsvector).""" + + query: str = Field(..., min_length=1, max_length=500) + limit: int = Field(default=10, ge=1, le=100) + deal_type: DealType | None = None + city: str | None = Field(default=None, max_length=128) + min_price: float | None = Field(default=None, ge=0) + max_price: float | None = Field(default=None, ge=0) + + +class FulltextSearchResult(BaseModel): + """One listing returned by full-text search.""" + + id: int + title: str | None = None + description: str | None = None + generated_description: str | None = None + deal_type: DealType | None = None + price: float | None = None + currency: str | None = None + city: str | None = None + district: str | None = None + rooms_count: int | None = None + total_area: float | None = None + url_source: str | None = None + images_count: int | None = None + rank_score: float + created_at: str | None = None + + +class FulltextSearchResponse(BaseModel): + """Response wrapper for full-text search.""" + + query: str + results: list[FulltextSearchResult] + total: int