diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..3834ca2 --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data +DATABASE_POOL_SIZE=20 +DATABASE_MAX_OVERFLOW=10 +DATABASE_ECHO=false + +APP_HOST=0.0.0.0 +APP_PORT=8000 +LOG_LEVEL=info +DEBUG=false + +OLLAMA_BASE_URL=http://localhost:11434 +OLLAMA_TEXT_MODEL=llama3.2 +OLLAMA_VISION_MODEL=llava +OLLAMA_TIMEOUT=120 +OLLAMA_MOCK=false + +IMAGE_STORAGE_PATH=/var/lib/vmk/images + +ENABLE_IMAGE_ANALYSIS=true +ENABLE_PRICE_ESTIMATION=true diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0787736 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +.env +__pycache__/ +*.pyc +.pytest_cache/ +.mypy_cache/ +*.egg-info/ +dist/ +build/ +alembic/versions/*.pyc +.coverage +htmlcov/ +/var/lib/vmk/images/ +.venv/ +*.log +.env.local +.env.*.local diff --git a/README.md b/README.md index 39c2f97..8cb5d96 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,48 @@ -vmk-360-data_collector -=============== +# VMK 360 Data Collector + +## Описание + +Сервис приёма, нормализации и ИИ-обогащения данных об объектах недвижимости. + +- Принимает полусырые данные от парсеров через REST API +- Валидирует: AI определяет, является ли payload объявлением о недвижимости +- Нормализует: приводит неструктурированные данные к единому формату +- Обогащает: анализ изображений (vision) + текстовый анализ (NER, summary, оценка цены) +- Сохраняет в PostgreSQL с полной историей изменений (snapshots) + +## Быстрый старт + +```bash +# 1. Запуск PostgreSQL и Ollama (опционально) +docker compose up -d + +# 2. Установка зависимостей +pip install -e ".[dev]" + +# 3. Копирование .env +cp .env.example .env + +# 4. Применение миграций +alembic upgrade head + +# 5. Запуск приложения +uvicorn vmk_data_collector.main:app --reload --port 8000 +``` + +## API + +- `POST /api/v1/properties/ingest` — приём сырых данных от парсеров + +## Архитектура + +Сервис построен по слоистой архитектуре: +- **API Layer** — FastAPI, Pydantic валидация +- **Service Layer** — PropertyPipeline, AI-нормализация, обогащение +- **Repository Layer** — абстракция доступа к PostgreSQL +- **Domain Layer** — чистые сущности недвижимости +- **Infrastructure Layer** — Ollama client, image downloader, логирование + +## Документация + +- [Техническое задание](docs/SPECIFICATION.md) — полное описание модели данных, API, AI-слоя +- [Архитектура](docs/ARCHITECTURE.md) — диаграммы, потоки данных, компоненты diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..0a434c1 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,149 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s +# Or organize into date-based subdirectories (requires recursive_version_locations = true) +# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the tzdata library which can be installed by adding +# `alembic[tz]` to the pip requirements. +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..6ff7064 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,61 @@ +import asyncio +from logging.config import fileConfig + +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context + +from vmk_data_collector.db.base import Base +from vmk_data_collector.models import * # noqa: F403 + +config = context.config + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/33cad32b9bbd_initial.py b/alembic/versions/33cad32b9bbd_initial.py new file mode 100644 index 0000000..cfd78e1 --- /dev/null +++ b/alembic/versions/33cad32b9bbd_initial.py @@ -0,0 +1,199 @@ +"""initial + +Revision ID: 33cad32b9bbd +Revises: +Create Date: 2026-06-11 22:57:15.838928 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '33cad32b9bbd' +down_revision: Union[str, Sequence[str], None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('data_sources', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('slug', sa.String(length=64), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('url_pattern', sa.String(length=512), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('slug') + ) + op.create_table('property_types', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('slug', sa.String(length=64), nullable=False), + sa.Column('name', sa.String(length=128), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('slug') + ) + op.create_table('raw_parsing_data', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('source_id', sa.Integer(), nullable=True), + sa.Column('external_id', sa.String(length=255), nullable=True), + sa.Column('payload', sa.JSON(), nullable=False), + sa.Column('status', sa.Enum('pending', 'processing', 'completed', 'failed', 'invalid', name='rawdatastatus'), nullable=False), + sa.Column('validation_result', sa.Enum('valid', 'invalid', 'uncertain', name='validationresult'), nullable=True), + sa.Column('error_message', sa.Text(), nullable=True), + sa.Column('received_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('processed_at', sa.TIMESTAMP(timezone=True), nullable=True), + sa.ForeignKeyConstraint(['source_id'], ['data_sources.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('source_id', 'external_id') + ) + op.create_table('property_listings', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('raw_data_id', sa.Integer(), nullable=True), + sa.Column('source_id', sa.Integer(), nullable=True), + sa.Column('external_id', sa.String(length=255), nullable=True), + sa.Column('title', sa.String(length=512), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('generated_description', sa.Text(), nullable=True), + sa.Column('deal_type', sa.Enum('sale', 'rent_long', 'rent_short', name='dealtype'), nullable=True), + sa.Column('property_type_id', sa.Integer(), nullable=True), + sa.Column('price', sa.Numeric(precision=15, scale=2), nullable=True), + sa.Column('currency', sa.String(length=3), nullable=True), + sa.Column('original_price', sa.Numeric(precision=15, scale=2), nullable=True), + sa.Column('original_currency', sa.String(length=3), nullable=True), + sa.Column('price_per_sqm', sa.Numeric(precision=10, scale=2), nullable=True), + sa.Column('total_area', sa.Numeric(precision=8, scale=2), nullable=True), + sa.Column('living_area', sa.Numeric(precision=8, scale=2), nullable=True), + sa.Column('kitchen_area', sa.Numeric(precision=8, scale=2), nullable=True), + sa.Column('land_area', sa.Numeric(precision=10, scale=2), nullable=True), + sa.Column('rooms_count', sa.SmallInteger(), nullable=True), + sa.Column('bedrooms_count', sa.SmallInteger(), nullable=True), + sa.Column('bathrooms_count', sa.SmallInteger(), nullable=True), + sa.Column('layout', sa.Enum('studio', 'separate', 'adjacent', name='layouttype'), nullable=True), + sa.Column('floor', sa.SmallInteger(), nullable=True), + sa.Column('floors_total', sa.SmallInteger(), nullable=True), + sa.Column('building_year', sa.SmallInteger(), nullable=True), + sa.Column('building_type', sa.Enum('brick', 'panel', 'monolith', 'gas_block', 'wood', name='buildingtype'), nullable=True), + sa.Column('renovation_status', sa.Enum('cosmetic', 'euro', 'designer', 'none', 'construction', name='renovationstatus'), nullable=True), + sa.Column('ceiling_height', sa.Numeric(precision=4, scale=2), nullable=True), + sa.Column('material', sa.String(length=128), nullable=True), + sa.Column('has_balcony', sa.Boolean(), nullable=True), + sa.Column('has_loggia', sa.Boolean(), nullable=True), + sa.Column('balcony_count', sa.SmallInteger(), nullable=True), + sa.Column('loggia_count', sa.SmallInteger(), nullable=True), + sa.Column('bathroom_type', sa.Enum('combined', 'separate', 'multiple', name='bathroomtype'), nullable=True), + sa.Column('elevator_count', sa.SmallInteger(), nullable=True), + sa.Column('has_freight_elevator', sa.Boolean(), nullable=True), + sa.Column('parking_type', sa.Enum('ground', 'underground', 'none', 'garage', name='parkingtype'), nullable=True), + sa.Column('heating_type', sa.Enum('central', 'autonomous', 'floor', 'none', name='heatingtype'), nullable=True), + sa.Column('internet', sa.Boolean(), nullable=True), + sa.Column('security', sa.Boolean(), nullable=True), + sa.Column('windows_direction', sa.String(length=128), nullable=True), + sa.Column('window_view', sa.Enum('yard', 'street', 'park', 'water', 'forest', name='windowview'), nullable=True), + sa.Column('address_raw', sa.Text(), nullable=True), + sa.Column('city', sa.String(length=128), nullable=True), + sa.Column('district', sa.String(length=128), nullable=True), + sa.Column('micro_district', sa.String(length=128), nullable=True), + sa.Column('street', sa.String(length=128), nullable=True), + sa.Column('house_number', sa.String(length=32), nullable=True), + sa.Column('metro_station', sa.String(length=128), nullable=True), + sa.Column('metro_distance_min', sa.SmallInteger(), nullable=True), + sa.Column('metro_distance_type', sa.Enum('walk', 'transport', name='metrodistancetype'), nullable=True), + sa.Column('latitude', sa.Numeric(precision=10, scale=8), nullable=True), + sa.Column('longitude', sa.Numeric(precision=11, scale=8), nullable=True), + sa.Column('contact_phone', sa.String(length=64), nullable=True), + sa.Column('contact_name', sa.String(length=255), nullable=True), + sa.Column('contact_email', sa.String(length=255), nullable=True), + sa.Column('is_agent', sa.Boolean(), nullable=True), + sa.Column('agency_name', sa.String(length=255), nullable=True), + sa.Column('publish_date', sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column('url_source', sa.Text(), nullable=True), + sa.Column('listing_status', sa.Enum('active', 'sold', 'rented', 'removed', 'archived', name='listingstatus'), nullable=True), + sa.Column('images_count', sa.Integer(), nullable=True), + sa.Column('listing_quality_score', sa.SmallInteger(), nullable=True), + sa.Column('reliability_rating', sa.SmallInteger(), nullable=True), + sa.Column('sentiment_score', sa.Numeric(precision=3, scale=2), nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.Column('updated_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.ForeignKeyConstraint(['property_type_id'], ['property_types.id'], ), + sa.ForeignKeyConstraint(['raw_data_id'], ['raw_parsing_data.id'], ), + sa.ForeignKeyConstraint(['source_id'], ['data_sources.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('raw_data_id'), + sa.UniqueConstraint('source_id', 'external_id') + ) + op.create_table('ai_enrichments', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('property_id', sa.Integer(), nullable=False), + sa.Column('extracted_features', sa.JSON(), nullable=False), + sa.Column('price_assessment', sa.JSON(), nullable=False), + sa.Column('listing_quality_score', sa.SmallInteger(), nullable=True), + sa.Column('reliability_rating', sa.SmallInteger(), nullable=True), + sa.Column('sentiment_score', sa.Numeric(precision=3, scale=2), nullable=True), + sa.Column('classification', sa.String(length=64), nullable=True), + sa.Column('image_analysis_results', sa.JSON(), nullable=False), + sa.Column('generated_description', sa.Text(), nullable=True), + sa.Column('summary', sa.Text(), nullable=True), + sa.Column('model_version', sa.String(length=64), nullable=True), + sa.Column('processing_time_ms', sa.Integer(), nullable=True), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.ForeignKeyConstraint(['property_id'], ['property_listings.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('property_id') + ) + op.create_table('property_custom_fields', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('property_id', sa.Integer(), nullable=False), + sa.Column('field_name', sa.String(length=128), nullable=False), + sa.Column('field_value', sa.Text(), nullable=False), + sa.Column('field_type', sa.Enum('str', 'int', 'float', 'bool', 'date', 'json', name='customfieldtype'), nullable=False), + sa.ForeignKeyConstraint(['property_id'], ['property_listings.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('property_id', 'field_name') + ) + op.create_table('property_images', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('property_id', sa.Integer(), nullable=False), + sa.Column('url', sa.Text(), nullable=False), + sa.Column('local_path', sa.String(length=512), nullable=True), + sa.Column('hash', sa.String(length=64), nullable=True), + sa.Column('file_size', sa.Integer(), nullable=True), + sa.Column('width', sa.SmallInteger(), nullable=True), + sa.Column('height', sa.SmallInteger(), nullable=True), + sa.Column('download_status', sa.Enum('pending', 'downloaded', 'failed', name='imagedownloadstatus'), nullable=False), + sa.Column('ai_description', sa.Text(), nullable=True), + sa.Column('analysis_status', sa.Enum('pending', 'completed', 'failed', name='imageanalysisstatus'), nullable=False), + sa.Column('order_index', sa.SmallInteger(), nullable=False), + sa.ForeignKeyConstraint(['property_id'], ['property_listings.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('property_snapshots', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('property_id', sa.Integer(), nullable=False), + sa.Column('snapshot_data', sa.JSON(), nullable=False), + sa.Column('changed_fields', sa.JSON(), nullable=False), + sa.Column('created_at', sa.TIMESTAMP(timezone=True), server_default=sa.text('now()'), nullable=False), + sa.ForeignKeyConstraint(['property_id'], ['property_listings.id'], ondelete='CASCADE'), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('property_snapshots') + op.drop_table('property_images') + op.drop_table('property_custom_fields') + op.drop_table('ai_enrichments') + op.drop_table('property_listings') + op.drop_table('raw_parsing_data') + op.drop_table('property_types') + op.drop_table('data_sources') + # ### end Alembic commands ### diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d315418 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +services: + postgres: + image: postgres:16-alpine + container_name: vmk_postgres + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: vmk_data + ports: + - "5432:5432" + volumes: + - vmk_postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d vmk_data"] + interval: 5s + timeout: 5s + retries: 5 + +volumes: + vmk_postgres_data: diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md new file mode 100644 index 0000000..fe15ca9 --- /dev/null +++ b/docs/ARCHITECTURE.md @@ -0,0 +1,329 @@ +# VMK 360 Data Collector — Архитектура + +## Общая архитектура + +Сервис построен по принципу **слоистой архитектуры** (Layered Architecture) с разделением на слои: + +1. **API Layer** — FastAPI роутеры, Pydantic схемы, middleware +2. **Service Layer** — бизнес-логика, пайплайны +3. **Repository Layer** — абстракция доступа к БД +4. **Domain Layer** — чистые сущности и правила +5. **Infrastructure Layer** — HTTP клиенты, БД, хранилище файлов, логирование + +``` +┌─────────────────────────────────────────────────────────────┐ +│ API Layer │ +│ POST /api/v1/properties/ingest │ +│ Pydantic validation → FastAPI router │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Service Layer │ +│ PropertyPipeline │ +│ ├── AiNormalizer (Ollama) │ +│ ├── PropertyNormalizer │ +│ ├── ImageDownloader │ +│ ├── AiImageAnalyzer (Ollama Vision) │ +│ └── AiEnricher (Ollama) │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Repository Layer │ +│ RawDataRepository PropertyRepository │ +│ ImageRepository CustomFieldRepository │ +│ SnapshotRepository AiEnrichmentRepository │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Infrastructure Layer │ +│ PostgreSQL (asyncpg) Ollama API (httpx) │ +│ Local filesystem (images) structlog │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Поток данных + +### Приём нового объявления + +``` +Parser ──► POST /api/v1/properties/ingest + │ + ▼ + RawDataIngestSchema (Pydantic validate) + │ + ▼ + [RawDataRepository] save(status=pending) + │ + ▼ + PropertyPipeline.process(raw_data_id) + │ + ├─► [AiNormalizer] LLM validates & structures + │ ├──► reject → raw_data.status = invalid + │ └──► accept → NormalizedPropertySchema + │ + ├─► [PropertyRepository] upsert + │ ├──► new → INSERT property_listings + │ └──► existing → INSERT snapshot + UPDATE listing + │ + ├─► [ImageDownloader] async download + │ └──► local filesystem + property_images records + │ + ├─► [AiImageAnalyzer] vision analysis per image + │ └──► ai_description + analysis_status + │ + ├─► [AiEnricher] text + image_analysis + │ └──► ai_enrichments record + │ + └─► [RawDataRepository] status = completed + │ + ▼ + Response 202 Accepted +``` + +### Обновление существующего объявления + +``` +Same external_id from same source + │ + ▼ + Load existing property_listings + │ + ▼ + [SnapshotRepository] save current state as JSONB + │ + ▼ + [PropertyRepository] update fields + │ + ▼ + [CustomFieldRepository] delete old + insert new + │ + ▼ + [ImageDownloader] skip existing (hash match), download new + │ + ▼ + Re-run AiImageAnalyzer + AiEnricher + │ + ▼ + Response 202 { property_id, snapshot_id } +``` + +## Модель данных (ER диаграмма) + +``` +┌──────────────────┐ ┌─────────────────────┐ +│ data_sources │ │ property_types │ +├──────────────────┤ ├─────────────────────┤ +│ id (PK) │ │ id (PK) │ +│ slug (UQ) │ │ slug (UQ) │ +│ name │ │ name │ +│ url_pattern │ │ description │ +│ description │ └─────────────────────┘ +│ created_at │ +└──────────────────┘ + │ + │ 1:N + ▼ +┌──────────────────────────────────────────────┐ +│ raw_parsing_data │ +├──────────────────────────────────────────────┤ +│ id (PK) │ +│ source_id (FK) ──────────────────────────────┐ +│ external_id │ +│ payload (JSONB) │ +│ status: pending/processing/completed/failed/invalid +│ validation_result │ +│ error_message │ +│ received_at │ +│ processed_at │ +└──────────────────────────────────────────────┘ + │ + │ 1:1 + ▼ +┌──────────────────────────────────────────────┐ +│ property_listings │ +├──────────────────────────────────────────────┤ +│ id (PK) │ +│ raw_data_id (FK, UQ) ◄───────────────────────┘ +│ source_id (FK) │ +│ external_id │ +│ title, description, generated_description │ +│ deal_type, property_type_id (FK) │ +│ price, currency, original_price, original_currency +│ price_per_sqm │ +│ total_area, living_area, kitchen_area, land_area +│ rooms_count, bedrooms_count, bathrooms_count │ +│ layout, floor, floors_total │ +│ building_year, building_type │ +│ renovation_status, ceiling_height, material │ +│ has_balcony, has_loggia, balcony_count │ +│ loggia_count, bathroom_type │ +│ elevator_count, has_freight_elevator │ +│ parking_type, heating_type │ +│ internet, security │ +│ windows_direction, window_view │ +│ address_raw, city, district, micro_district │ +│ street, house_number │ +│ metro_station, metro_distance_min │ +│ metro_distance_type │ +│ latitude, longitude │ +│ contact_phone, contact_name, contact_email │ +│ is_agent, agency_name │ +│ publish_date, url_source │ +│ listing_status │ +│ images_count │ +│ listing_quality_score │ +│ reliability_rating │ +│ sentiment_score │ +│ created_at, updated_at │ +└──────────────────────────────────────────────┘ + │ + ┌────┼────┐ + │ │ │ + ▼ ▼ ▼ +┌────────┐ ┌──────────────┐ ┌─────────────────┐ +│property│ │property_custom│ │ ai_enrichments │ +│_images │ │ _fields │ │ │ +├────────┤ ├──────────────┤ ├─────────────────┤ +│id (PK) │ │id (PK) │ │id (PK) │ +│property│ │property_id(FK) │ │property_id(FK,UQ)│ +│_id(FK) │ │field_name │ │extracted_features│ +│url │ │field_value │ │price_assessment │ +│local_ │ │field_type │ │listing_quality_score│ +│path │ │ │ │reliability_rating│ +│hash │ │UQ(property_id,│ │sentiment_score │ +│file_size│ │ field_name) │ │classification │ +│width │ │ │ │image_analysis_ │ +│height │ │ │ │ results │ +│download│ │ │ │generated_description│ +│_status │ │ │ │summary │ +│ai_ │ │ │ │model_version │ +│description│ │ │processing_time_ms│ +│analysis│ │ │ │created_at │ +│_status │ │ │ └─────────────────┘ +│order_ │ │ │ +│index │ │ │ +└────────┘ └──────────────┘ + │ + ▼ +┌──────────────────────┐ +│ property_snapshots │ +├──────────────────────┤ +│ id (PK) │ +│ property_id (FK) │ +│ snapshot_data (JSONB)│ +│ changed_fields (JSONB)│ +│ created_at │ +└──────────────────────┘ +``` + +## Компоненты + +### AiNormalizer +- **Роль**: Первый фильтр. LLM решает: недвижимость это или нет. +- **Вход**: `raw_parsing_data.payload` (title, description, params). +- **Выход**: `NormalizedPropertySchema` или `ValidationError`. +- **Особенности**: Работает с непредсказуемыми данными. Извлекает параметры даже из каши текста. + +### PropertyNormalizer +- **Роль**: Маппинг результата LLM в `property_listings` + `property_custom_fields`. +- **Особенности**: Все поля, которых нет в core-схеме, идут в `custom_fields`. + +### ImageDownloader +- **Роль**: Асинхронное скачивание изображений. +- **Процесс**: + 1. `httpx` async GET по URL. + 2. Сохранение в `IMAGE_STORAGE_PATH / {property_id} / {hash}.{ext}`. + 3. SHA-256 hash для дедупликации. + 4. Pillow для width/height. + 5. Обновление `property_images` записей. + +### AiImageAnalyzer +- **Роль**: Анализ каждого изображения через Ollama Vision. +- **Вход**: base64 изображение. +- **Выход**: `ImageAnalysisSchema` (состояние, комнаты, проблемы, плюсы). +- **Модель**: `llava` или `llama3.2-vision`. + +### AiEnricher +- **Роль**: Финальное обогащение текста + агрегированный image_analysis. +- **Вход**: Нормализованный текст + `image_analysis_results`. +- **Выход**: `AiEnrichmentSchema`. +- **Модель**: `llama3.2` или аналог. + +### PropertyPipeline +- **Роль**: Оркестратор всего пайплайна. +- **Поведение**: Все шаги выполняются последовательно в рамках одного HTTP-запроса (средняя нагрузка). При росте — можно вынести в Celery/RQ без изменения интерфейсов. + +## Хранение изображений + +``` +IMAGE_STORAGE_PATH/ +└── properties/ + └── {property_id}/ + ├── {hash1}.jpg + ├── {hash2}.png + └── {hash3}.jpg +``` + +- Дедупликация по SHA-256 hash на уровне БД. +- При upsert: новые картинки добавляются, существующие по hash пропускаются. + +## AI-валидация: reject cases + +Примеры payload, которые должны получить `status = invalid`: + +```json +// Автомобиль +{ "title": "Продам Toyota Camry 2020", "price": 1500000, ... } + +// Телефон +{ "title": "iPhone 15 Pro Max 256GB", ... } + +// Животное +{ "title": "Котёнок британский даром", ... } + +// Вакансия +{ "title": "Требуется водитель категории B", ... } +``` + +Примеры, которые должны пройти (`status = completed`): + +```json +// Квартира (даже если неструктурированно) +{ "title": "Продам 2-к квартиру 55м² на 5 этаже" } + +// Дом +{ "title": "Дом 120м² на участке 6 соток" } + +// Гараж +{ "title": "Продам гараж 18м² в ГСК Маяк" } + +// Земля +{ "title": "Участок 10 соток ИЖС в с. Вешки" } +``` + +## Масштабируемость + +Текущая архитектура рассчитана на **среднюю нагрузку** (синхронная обработка в HTTP-запросе). + +Для масштабирования: +1. Вынести `PropertyPipeline.process()` в фоновую задачу (Celery / RQ / arq). +2. API возвращает `202 Accepted` с `job_id` и сразу отдаёт ответ. +3. Фоновый воркер обрабатывает: normalize → download images → AI analyze → save. +4. Добавить webhook callback по завершению. + +## Технологические решения + +| Решение | Обоснование | +|---------|-------------| +| **Async SQLAlchemy + asyncpg** | FastAPI async. Лучшее использование I/O-bound операций (БД + HTTP к Ollama). | +| **JSONB для payload / snapshot** | Парсеры присылают разные структуры. Гибкость без ALTER TABLE. | +| **Structured LLM output** | Ollama поддерживает JSON mode. Надёжнее парсинга свободного текста. | +| **Repository pattern** | Легко мокировать в тестах. Замена БД без изменения бизнес-логики. | +| **SHA-256 дедупликация изображений** | Одна и та же картинка может быть по разным URL. | +| **Custom fields таблица** | Парсеры постоянно добавляют новые атрибуты. Не нужно менять схему БД. | +| **Snapshots** | Требование бизнеса: история изменений объявлений. | +| **Ollama (локальный LLM)** | Независимость от внешних API. Конфиденциальность данных. Контроль затрат. | +| **Pillow для изображений** | Лёгкая зависимость. Извлечение метаданных без тяжёлых библиотек. | diff --git a/docs/IMPLEMENTATION_PLAN.md b/docs/IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000..0e5ee16 --- /dev/null +++ b/docs/IMPLEMENTATION_PLAN.md @@ -0,0 +1,909 @@ +# VMK 360 Data Collector — Детальный план реализации + +> Плейбук поэтапной разработки. Каждая фаза содержит конкретные файлы, классы и функции. + +--- + +## Фаза 0: Подготовка окружения (до кода) + +### Шаг 0.1: Инициализация git-репозитория +- `git init` (уже есть) +- Создать `.gitignore`: + ``` + .env + __pycache__/ + *.pyc + .pytest_cache/ + .mypy_cache/ + *.egg-info/ + dist/ + build/ + alembic/versions/*.pyc + .coverage + htmlcov/ + /var/lib/vmk/images/ + ``` + +### Шаг 0.2: Запуск PostgreSQL +- `docker compose up -d postgres` +- Проверить: `docker logs vmk_postgres` → `database system is ready` + +### Шаг 0.3: Создание директорий +``` +mkdir -p src/vmk_data_collector/{core,api/v1,domain,schemas,services,db/repositories,models} +mkdir -p tests/{unit,integration} +mkdir -p alembic/versions +mkdir -p docs +``` + +--- + +## Фаза 1: Фундамент (зависимости, конфиг, логи) + +### Шаг 1.1: `pyproject.toml` +Создать `/home/gmikcon/Projects/vmk/data_collector/pyproject.toml`: +- build-system: setuptools +- dependencies: fastapi, uvicorn, sqlalchemy[asyncio], asyncpg, alembic, pydantic, pydantic-settings, httpx, structlog, python-dotenv, tenacity, pillow +- dev: pytest, pytest-asyncio, httpx, factory-boy, faker, ruff, black, mypy, coverage +- tool.pytest.ini_options: asyncio_mode=auto, markers: unit/integration/slow +- tool.black: line-length 88, target py312 +- tool.ruff: target py312, select E,F,I,N,W,UP,B,C4,SIM +- tool.mypy: python_version=3.12, disallow_untyped_defs=true + +### Шаг 1.2: `.env.example` +Создать `/home/gmikcon/Projects/vmk/data_collector/.env.example`: +``` +DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data +DATABASE_POOL_SIZE=20 +DATABASE_MAX_OVERFLOW=10 +DATABASE_ECHO=false + +APP_HOST=0.0.0.0 +APP_PORT=8000 +LOG_LEVEL=info +DEBUG=false + +OLLAMA_BASE_URL=http://localhost:11434 +OLLAMA_TEXT_MODEL=llama3.2 +OLLAMA_VISION_MODEL=llava +OLLAMA_TIMEOUT=120 +OLLAMA_MOCK=false + +IMAGE_STORAGE_PATH=/var/lib/vmk/images + +ENABLE_IMAGE_ANALYSIS=true +ENABLE_PRICE_ESTIMATION=true +``` + +### Шаг 1.3: `docker-compose.yml` +Создать `/home/gmikcon/Projects/vmk/data_collector/docker-compose.yml`: +- Сервис `postgres`: image `postgres:16-alpine`, env POSTGRES_USER/POSTGRES_PASSWORD/POSTGRES_DB, порт 5432, volume `vmk_postgres_data` +- healthcheck `pg_isready` +- volume `vmk_postgres_data` + +### Шаг 1.4: Установка зависимостей +```bash +python -m venv .venv +source .venv/bin/activate +pip install -e ".[dev]" +``` + +### Шаг 1.5: Конфигурация (`src/vmk_data_collector/core/config.py`) +Создать `Settings(BaseSettings)`: +- `model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")` +- Поля: `database_url: str`, `database_pool_size: int=20`, `database_max_overflow: int=10`, `database_echo: bool=False` +- Поля: `app_host: str`, `app_port: int`, `log_level: str`, `debug: bool=False` +- Поля: `ollama_base_url: str`, `ollama_text_model: str`, `ollama_vision_model: str`, `ollama_timeout: int=120`, `ollama_mock: bool=False` +- Поля: `image_storage_path: str`, `enable_image_analysis: bool=True`, `enable_price_estimation: bool=True` +- `@property def database_url_async(self) -> str` — возвращает URL с `+asyncpg` +- `@property def image_storage_path_abs(self) -> Path` — `Path(self.image_storage_path).resolve()` + +### Шаг 1.6: Логирование (`src/vmk_data_collector/core/logging.py`) +Создать функцию `configure_logging(log_level: str) -> None`: +- structlog.configure с процессорами: `filter_by_level`, `add_logger_name`, `add_log_level`, `format_exc_info`, `TimeStamper`, `JSONRenderer` +- Если `debug=True` — `ConsoleRenderer(colors=True)` + +### Шаг 1.7: Исключения (`src/vmk_data_collector/core/exceptions.py`) +Создать иерархию: +- `class AppException(Exception): ...` +- `class ValidationError(AppException): ...` +- `class AIProcessingError(AppException): ...` +- `class NotRealEstateError(ValidationError): ...` — специфичный reject +- `class DatabaseError(AppException): ...` + +### Definition of Done фазы 1 +- [ ] `pip install -e ".[dev]"` проходит без ошибок +- [ ] `python -c "from vmk_data_collector.core.config import Settings; print(Settings())"` читает `.env` +- [ ] `docker compose up -d` запускает PostgreSQL +- [ ] `ruff check src/` проходит без ошибок + +--- + +## Фаза 2: Модели БД и миграции Alembic + +### Шаг 2.1: Инициализация Alembic +```bash +alembic init alembic +``` +- Настроить `alembic.ini`: `sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data` +- Настроить `alembic/env.py`: + - Импорт `Base` из `vmk_data_collector.db.base` + - `target_metadata = Base.metadata` + - `run_migrations_online()` с `AsyncEngine` из `create_async_engine` + +### Шаг 2.2: База (`src/vmk_data_collector/db/base.py`) +Создать: +```python +from sqlalchemy.orm import DeclarativeBase +class Base(DeclarativeBase): + pass +``` + +### Шаг 2.3: Enum-классы (`src/vmk_data_collector/domain/enums.py`) +Создать: +- `class RawDataStatus(str, Enum)`: `pending`, `processing`, `completed`, `failed`, `invalid` +- `class ValidationResult(str, Enum)`: `valid`, `invalid`, `uncertain` +- `class DealType(str, Enum)`: `sale`, `rent_long`, `rent_short` +- `class ListingStatus(str, Enum)`: `active`, `sold`, `rented`, `removed`, `archived` +- `class BuildingType(str, Enum)`: `brick`, `panel`, `monolith`, `gas_block`, `wood` +- `class RenovationStatus(str, Enum)`: `cosmetic`, `euro`, `designer`, `none`, `construction` +- `class BathroomType(str, Enum)`: `combined`, `separate`, `multiple` +- `class ParkingType(str, Enum)`: `ground`, `underground`, `none`, `garage` +- `class HeatingType(str, Enum)`: `central`, `autonomous`, `floor`, `none` +- `class LayoutType(str, Enum)`: `studio`, `separate`, `adjacent` +- `class WindowView(str, Enum)`: `yard`, `street`, `park`, `water`, `forest` +- `class MetroDistanceType(str, Enum)`: `walk`, `transport` +- `class ImageDownloadStatus(str, Enum)`: `pending`, `downloaded`, `failed` +- `class ImageAnalysisStatus(str, Enum)`: `pending`, `completed`, `failed` +- `class CustomFieldType(str, Enum)`: `str`, `int`, `float`, `bool`, `date`, `json` + +### Шаг 2.4: ORM-модели + +#### `src/vmk_data_collector/models/data_source.py` +```python +class DataSource(Base): + __tablename__ = "data_sources" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + url_pattern: Mapped[str | None] = mapped_column(String(512)) + description: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) +``` + +#### `src/vmk_data_collector/models/property_type.py` +```python +class PropertyType(Base): + __tablename__ = "property_types" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + name: Mapped[str] = mapped_column(String(128), nullable=False) + description: Mapped[str | None] = mapped_column(Text) +``` + +#### `src/vmk_data_collector/models/raw_parsing_data.py` +```python +class RawParsingData(Base): + __tablename__ = "raw_parsing_data" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + source_id: Mapped[int | None] = mapped_column(ForeignKey("data_sources.id")) + external_id: Mapped[str | None] = mapped_column(String(255)) + payload: Mapped[dict] = mapped_column(JSONB, default={}) + status: Mapped[RawDataStatus] = mapped_column(Enum(RawDataStatus, name="raw_data_status"), default=RawDataStatus.pending) + validation_result: Mapped[ValidationResult | None] = mapped_column(Enum(ValidationResult, name="validation_result")) + error_message: Mapped[str | None] = mapped_column(Text) + received_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) + processed_at: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True)) + __table_args__ = (UniqueConstraint("source_id", "external_id"),) +``` + +#### `src/vmk_data_collector/models/property_listing.py` +Создать с **всеми полями** из SPECIFICATION.md: +- id (PK), raw_data_id (FK, UNIQUE), source_id (FK), external_id +- title, description, generated_description +- deal_type (ENUM), property_type_id (FK) +- price, currency, original_price, original_currency, price_per_sqm +- total_area, living_area, kitchen_area, land_area +- rooms_count, bedrooms_count, bathrooms_count, layout +- floor, floors_total, building_year +- building_type (ENUM), renovation_status (ENUM), ceiling_height, material +- has_balcony, has_loggia, balcony_count, loggia_count +- bathroom_type (ENUM), elevator_count, has_freight_elevator +- parking_type (ENUM), heating_type (ENUM), internet, security +- windows_direction, window_view (ENUM) +- address_raw, city, district, micro_district, street, house_number +- metro_station, metro_distance_min, metro_distance_type (ENUM) +- latitude, longitude +- contact_phone, contact_name, contact_email, is_agent, agency_name +- publish_date, url_source, listing_status (ENUM), images_count +- listing_quality_score, reliability_rating, sentiment_score +- created_at, updated_at +- `__table_args__`: `UniqueConstraint("source_id", "external_id")` + +#### `src/vmk_data_collector/models/property_image.py` +```python +class PropertyImage(Base): + __tablename__ = "property_images" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE")) + url: Mapped[str] = mapped_column(Text, nullable=False) + local_path: Mapped[str | None] = mapped_column(String(512)) + hash: Mapped[str | None] = mapped_column(String(64)) + file_size: Mapped[int | None] = mapped_column(Integer) + width: Mapped[int | None] = mapped_column(SmallInteger) + height: Mapped[int | None] = mapped_column(SmallInteger) + download_status: Mapped[ImageDownloadStatus] = mapped_column(Enum(ImageDownloadStatus, name="image_download_status"), default=ImageDownloadStatus.pending) + ai_description: Mapped[str | None] = mapped_column(Text) + analysis_status: Mapped[ImageAnalysisStatus] = mapped_column(Enum(ImageAnalysisStatus, name="image_analysis_status"), default=ImageAnalysisStatus.pending) + order_index: Mapped[int] = mapped_column(SmallInteger, default=0) +``` + +#### `src/vmk_data_collector/models/property_custom_field.py` +```python +class PropertyCustomField(Base): + __tablename__ = "property_custom_fields" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE")) + field_name: Mapped[str] = mapped_column(String(128), nullable=False) + field_value: Mapped[str] = mapped_column(Text) + field_type: Mapped[CustomFieldType] = mapped_column(Enum(CustomFieldType, name="custom_field_type"), default=CustomFieldType.str) + __table_args__ = (UniqueConstraint("property_id", "field_name"),) +``` + +#### `src/vmk_data_collector/models/property_snapshot.py` +```python +class PropertySnapshot(Base): + __tablename__ = "property_snapshots" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE")) + snapshot_data: Mapped[dict] = mapped_column(JSONB, default={}) + changed_fields: Mapped[dict] = mapped_column(JSONB, default={}) + created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) +``` + +#### `src/vmk_data_collector/models/ai_enrichment.py` +```python +class AiEnrichment(Base): + __tablename__ = "ai_enrichments" + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE"), unique=True) + extracted_features: Mapped[dict] = mapped_column(JSONB, default={}) + price_assessment: Mapped[dict] = mapped_column(JSONB, default={}) + listing_quality_score: Mapped[int | None] = mapped_column(SmallInteger) + reliability_rating: Mapped[int | None] = mapped_column(SmallInteger) + sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2)) + classification: Mapped[str | None] = mapped_column(String(64)) + image_analysis_results: Mapped[dict] = mapped_column(JSONB, default={}) + generated_description: Mapped[str | None] = mapped_column(Text) + summary: Mapped[str | None] = mapped_column(Text) + model_version: Mapped[str | None] = mapped_column(String(64)) + processing_time_ms: Mapped[int | None] = mapped_column(Integer) + created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now()) +``` + +### Шаг 2.5: `src/vmk_data_collector/models/__init__.py` +Импортировать все модели для `Base.metadata`: +```python +from .data_source import DataSource +from .property_type import PropertyType +from .raw_parsing_data import RawParsingData +from .property_listing import PropertyListing +from .property_image import PropertyImage +from .property_custom_field import PropertyCustomField +from .property_snapshot import PropertySnapshot +from .ai_enrichment import AiEnrichment +``` + +### Шаг 2.6: Alembic initial migration +```bash +alembic revision --autogenerate -m "initial" +alembic upgrade head +``` + +### Шаг 2.7: Seed справочников +Создать `src/vmk_data_collector/db/seed.py`: +- Функция `async def seed_property_types(session: AsyncSession) -> None` +- Заполняет `property_types` (apartment, house, townhouse, commercial, land, garage, office, warehouse, retail, cottage, room, new_building) +- Заполняет `deal_types` (sale, rent_long, rent_short) + +### Definition of Done фазы 2 +- [ ] `alembic upgrade head` проходит без ошибок +- [ ] Все 8 таблиц созданы в PostgreSQL +- [ ] `\dt` в psql показывает все таблицы +- [ ] `seed_property_types()` заполняет справочники +- [ ] Модели импортируются без ошибок + +--- + +## Фаза 3: Инфраструктура БД и репозитории + +### Шаг 3.1: Engine + Session (`src/vmk_data_collector/db/engine.py`) +```python +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession +from vmk_data_collector.core.config import settings + +engine = create_async_engine( + settings.database_url_async, + pool_size=settings.database_pool_size, + max_overflow=settings.database_max_overflow, + echo=settings.database_echo, +) +``` + +### Шаг 3.2: Session maker (`src/vmk_data_collector/db/session.py`) +```python +from .engine import engine +AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + +async def get_db_session() -> AsyncSession: + async with AsyncSessionLocal() as session: + yield session +``` + +### Шаг 3.3: Базовый репозиторий (`src/vmk_data_collector/db/repositories/base.py`) +```python +class BaseRepository: + def __init__(self, session: AsyncSession) -> None: + self._session = session + + async def add(self, obj: T) -> T: + self._session.add(obj) + await self._session.flush() + return obj + + async def delete(self, obj: T) -> None: + await self._session.delete(obj) + + async def commit(self) -> None: + await self._session.commit() +``` + +### Шаг 3.4: RawDataRepository (`src/vmk_data_collector/db/repositories/raw_data.py`) +Методы: +- `async def create(self, source_id: int | None, external_id: str | None, payload: dict) -> RawParsingData` +- `async def get_by_id(self, raw_data_id: int) -> RawParsingData | None` +- `async def get_by_source_and_external(self, source_id: int, external_id: str) -> RawParsingData | None` +- `async def update_status(self, raw_data_id: int, status: RawDataStatus, error_message: str | None = None) -> None` +- `async def set_processed(self, raw_data_id: int) -> None` + +### Шаг 3.5: PropertyRepository (`src/vmk_data_collector/db/repositories/property.py`) +Методы: +- `async def create(self, **kwargs) -> PropertyListing` +- `async def get_by_id(self, property_id: int) -> PropertyListing | None` +- `async def get_by_source_and_external(self, source_id: int, external_id: str) -> PropertyListing | None` +- `async def update(self, property_id: int, **kwargs) -> PropertyListing` +- `async def delete_custom_fields(self, property_id: int) -> None` + +### Шаг 3.6: ImageRepository (`src/vmk_data_collector/db/repositories/image.py`) +Методы: +- `async def create(self, property_id: int, url: str, order_index: int = 0) -> PropertyImage` +- `async def get_by_property(self, property_id: int) -> list[PropertyImage]` +- `async def get_by_hash(self, property_id: int, image_hash: str) -> PropertyImage | None` +- `async def update_downloaded(self, image_id: int, local_path: str, file_size: int, width: int, height: int, image_hash: str) -> None` +- `async def update_analysis(self, image_id: int, ai_description: str) -> None` + +### Шаг 3.7: CustomFieldRepository (`src/vmk_data_collector/db/repositories/custom_field.py`) +Методы: +- `async def create(self, property_id: int, field_name: str, field_value: str, field_type: CustomFieldType = CustomFieldType.str) -> PropertyCustomField` +- `async def bulk_create(self, property_id: int, fields: list[dict]) -> None` +- `async def delete_by_property(self, property_id: int) -> None` + +### Шаг 3.8: SnapshotRepository (`src/vmk_data_collector/db/repositories/snapshot.py`) +Методы: +- `async def create(self, property_id: int, snapshot_data: dict, changed_fields: dict) -> PropertySnapshot` + +### Шаг 3.9: AiEnrichmentRepository (`src/vmk_data_collector/db/repositories/ai_enrichment.py`) +Методы: +- `async def create(self, property_id: int, **kwargs) -> AiEnrichment` +- `async def delete_by_property(self, property_id: int) -> None` + +### Шаг 3.10: DataSourceRepository (`src/vmk_data_collector/db/repositories/data_source.py`) +Методы: +- `async def get_or_create_by_slug(self, slug: str, name: str | None = None) -> DataSource` + +### Definition of Done фазы 3 +- [ ] `get_db_session()` создаёт валидную async сессию +- [ ] Каждый репозиторий покрыт unit-тестом (мок сессии) +- [ ] CRUD операции работают через реальный engine (integration smoke test) + +--- + +## Фаза 4: Pydantic схемы (валидация данных) + +### Шаг 4.1: Domain entities (`src/vmk_data_collector/domain/entities.py`) +```python +@dataclass +class NormalizedProperty: + property_type: str + deal_type: str + title: str | None + description: str | None + price: float | None + currency: str | None + total_area: float | None + ... (все поля из property_listings) + custom_fields: dict[str, Any] + images: list[str] + +@dataclass +class AiImageAnalysis: + overall_condition: str | None + rooms_observed: int | None + issues_found: list[str] + positive_highlights: list[str] + view_from_window: str | None + furniture_included: bool | None + appliances_included: list[str] + +@dataclass +class AiEnrichmentResult: + extracted_features: dict[str, Any] + price_assessment: dict[str, Any] + listing_quality_score: int | None + reliability_rating: int | None + sentiment_score: float | None + classification: str | None + image_analysis_results: dict[str, Any] + generated_description: str | None + summary: str | None + model_version: str | None + processing_time_ms: int | None +``` + +### Шаг 4.2: API schemas (`src/vmk_data_collector/schemas/raw_data.py`) +- `class RawDataIngestRequest(BaseModel)`: + - `source_slug: str` + - `external_id: str` + - `payload: dict[str, Any]` +- `class IngestResponse(BaseModel)`: + - `job_id: int` + - `property_id: int | None` + - `status: str` + - `reason: str | None = None` + - `message: str` + - `snapshot_id: int | None = None` + +### Шаг 4.3: AI schemas (`src/vmk_data_collector/schemas/ai_response.py`) +- `class AiNormalizerResponse(BaseModel)`: + - `is_real_estate: bool` + - `reason: str | None` + - `normalized: NormalizedPropertySchema | None` +- `class AiImageAnalysisResponse(BaseModel)`: + - `overall_condition: str | None` + - `rooms_observed: int | None` + - `issues_found: list[str]` + - `positive_highlights: list[str]` + - `view_from_window: str | None` + - `furniture_included: bool | None` + - `appliances_included: list[str]` +- `class AiEnrichmentResponse(BaseModel)`: + - `extracted_features: dict[str, Any]` + - `price_assessment: dict[str, Any]` + - `listing_quality_score: int | None` + - `reliability_rating: int | None` + - `sentiment_score: float | None` + - `classification: str | None` + - `image_analysis_results: dict[str, Any]` + - `generated_description: str | None` + - `summary: str | None` + - `model_version: str | None` + - `processing_time_ms: int | None` + +### Шаг 4.4: NormalizedPropertySchema (`src/vmk_data_collector/schemas/normalized.py`) +Pydantic-модель с **всеми полями** `property_listings` + `custom_fields: dict[str, Any]`. + +### Definition of Done фазы 4 +- [ ] Pydantic валидация `RawDataIngestRequest` работает с минимальным и полным payload +- [ ] `AiNormalizerResponse` корректно парсит JSON с `is_real_estate: false` +- [ ] `NormalizedPropertySchema` маппится из словаря без ошибок + +--- + +## Фаза 5: AI-слой (Ollama клиент + нормализатор + анализатор) + +### Шаг 5.1: Ollama HTTP клиент (`src/vmk_data_collector/services/ollama_client.py`) +```python +class OllamaClient: + def __init__(self, base_url: str, timeout: int) -> None: + self._client = httpx.AsyncClient(base_url=base_url, timeout=timeout) + + async def chat(self, model: str, messages: list[dict], json_mode: bool = False) -> dict: + payload = {"model": model, "messages": messages, "stream": False} + if json_mode: + payload["format"] = "json" + response = await self._client.post("/api/chat", json=payload) + response.raise_for_status() + return response.json() + + async def chat_with_images(self, model: str, messages: list[dict], images_base64: list[str]) -> dict: + # messages[0]["images"] = images_base64 + ... + + async def close(self) -> None: + await self._client.aclose() +``` + +### Шаг 5.2: AiNormalizer (`src/vmk_data_collector/services/ai_normalizer.py`) +```python +class AiNormalizer: + SYSTEM_PROMPT = """Ты — анализатор объявлений о недвижимости. +Определи, является ли текст объявлением о недвижимости. +Если нет — верни {"is_real_estate": false, "reason": "..."}. +Если да — верни {"is_real_estate": true, "normalized": {...}}. +Ответь ТОЛЬКО JSON.""" + + async def normalize(self, payload: dict) -> AiNormalizerResponse: + # 1. Формируем текст из payload (title + description + params) + # 2. Вызываем ollama_client.chat(json_mode=True) + # 3. Парсим ответ через AiNormalizerResponse + # 4. Если ollama_mock — возвращаем фиксированный мок +``` + +### Шаг 5.3: AiImageAnalyzer (`src/vmk_data_collector/services/ai_image_analyzer.py`) +```python +class AiImageAnalyzer: + SYSTEM_PROMPT = """Опиши состояние объекта недвижимости на фото. +Ответь ТОЛЬКО JSON с полями: overall_condition, rooms_observed, issues_found, positive_highlights, view_from_window, furniture_included, appliances_included.""" + + async def analyze(self, image_base64: str) -> AiImageAnalysisResponse: + # 1. Вызываем ollama_client.chat_with_images() + # 2. Парсим AiImageAnalysisResponse +``` + +### Шаг 5.4: AiEnricher (`src/vmk_data_collector/services/ai_enricher.py`) +```python +class AiEnricher: + SYSTEM_PROMPT = """Проанализируй объявление о недвижимости. +Верни ТОЛЬКО JSON с полями: +extracted_features, price_assessment, listing_quality_score (1-10), +reliability_rating (1-5), sentiment_score (-1..1), classification, +generated_description, summary, language.""" + + async def enrich(self, normalized: NormalizedProperty, image_analysis_results: dict) -> AiEnrichmentResult: + # 1. Формируем prompt из текста + image_analysis + # 2. Вызываем ollama_client.chat(json_mode=True) + # 3. Парсим AiEnrichmentResponse + # 4. Маппим в AiEnrichmentResult +``` + +### Шаг 5.5: Image Downloader (`src/vmk_data_collector/services/image_downloader.py`) +```python +class ImageDownloader: + def __init__(self, storage_path: Path) -> None: + self._storage_path = storage_path + + async def download(self, property_id: int, image_url: str, order_index: int) -> PropertyImageDownloadResult: + # 1. httpx async GET image_url + # 2. SHA-256 content + # 3. ext из Content-Type или URL + # 4. local_path = storage_path / str(property_id) / f"{hash}.{ext}" + # 5. Сохранить на диск + # 6. Pillow: width, height + # 7. file_size = len(content) + # 8. Вернуть dataclass с local_path, hash, width, height, file_size +``` + +### Definition of Done фазы 5 +- [ ] `OllamaClient.chat()` мокается в тестах (respx/httpx_mock) +- [ ] `AiNormalizer` корректно reject'ит авто-объявление +- [ ] `AiNormalizer` корректно accept'ит квартиру из каши текста +- [ ] `ImageDownloader` скачивает картинку, считает hash, извлекает размеры +- [ ] `AiImageAnalyzer` возвращает структурированный JSON +- [ ] `AiEnricher` возвращает enrichment с quality_score и reliability_rating + +--- + +## Фаза 6: Пайплайн (PropertyPipeline + FastAPI) + +### Шаг 6.1: PropertyPipeline (`src/vmk_data_collector/services/property_pipeline.py`) +```python +class PropertyPipeline: + def __init__( + self, + raw_repo: RawDataRepository, + property_repo: PropertyRepository, + image_repo: ImageRepository, + custom_field_repo: CustomFieldRepository, + snapshot_repo: SnapshotRepository, + enrichment_repo: AiEnrichmentRepository, + data_source_repo: DataSourceRepository, + normalizer: AiNormalizer, + image_downloader: ImageDownloader, + image_analyzer: AiImageAnalyzer, + enricher: AiEnricher, + ) -> None: + ... + + async def process(self, raw_data_id: int) -> IngestResponse: + # 1. raw_repo.get_by_id(raw_data_id) + # 2. raw_repo.update_status → processing + # 3. normalizer.normalize(raw.payload) + # 4. Если not is_real_estate: + # raw_repo.update_status → invalid + # return IngestResponse(status="invalid", reason=...) + # 5. data_source = data_source_repo.get_or_create_by_slug(source_slug) + # 6. existing = property_repo.get_by_source_and_external(...) + # 7. Если existing: + # snapshot_repo.create(existing) + # property_repo.update(existing.id, **normalized_data) + # custom_field_repo.delete_by_property(existing.id) + # property_id = existing.id + # Иначе: + # property = property_repo.create(**normalized_data) + # property_id = property.id + # 8. custom_field_repo.bulk_create(property_id, normalized.custom_fields) + # 9. images: for url in normalized.images: + # image = image_repo.create(property_id, url, order) + # result = await image_downloader.download(property_id, url, order) + # image_repo.update_downloaded(image.id, result.local_path, ...) + # analysis = await image_analyzer.analyze(base64) + # image_repo.update_analysis(image.id, analysis.overall_condition) + # 10. enrichment = await enricher.enrich(normalized, aggregated_image_analysis) + # 11. enrichment_repo.delete_by_property(property_id) + # 12. enrichment_repo.create(property_id, **enrichment) + # 13. raw_repo.set_processed(raw_data_id) + # 14. return IngestResponse(status="completed", property_id=...) +``` + +### Шаг 6.2: FastAPI main (`src/vmk_data_collector/main.py`) +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + # startup: configure_logging, create image storage dir + yield + # shutdown: close ollama client + +app = FastAPI(title="VMK Data Collector", version="0.1.0", lifespan=lifespan) +app.include_router(properties_router, prefix="/api/v1") +``` + +### Шаг 6.3: API deps (`src/vmk_data_collector/api/deps.py`) +```python +async def get_db() -> AsyncSession: + async with AsyncSessionLocal() as session: + yield session + +async def get_property_pipeline(db: AsyncSession = Depends(get_db)) -> PropertyPipeline: + # Собрать все зависимости вручную (или использовать DI-контейнер) + ... +``` + +### Шаг 6.4: Router (`src/vmk_data_collector/api/v1/router_properties.py`) +```python +router = APIRouter(prefix="/properties") + +@router.post("/ingest", response_model=IngestResponse, status_code=202) +async def ingest_property( + request: RawDataIngestRequest, + pipeline: PropertyPipeline = Depends(get_property_pipeline), +) -> IngestResponse: + # 1. Создать raw_parsing_data через raw_repo + # 2. Запустить pipeline.process(raw_data_id) + # 3. Вернуть IngestResponse +``` + +### Шаг 6.5: Exception handlers (`src/vmk_data_collector/core/exceptions.py` + в main.py) +- `AppException` → 500 с деталями +- `ValidationError` → 422 +- `NotRealEstateError` → 202 (но со статусом invalid) +- `AIProcessingError` → 202 (но со статусом failed) + +### Definition of Done фазы 6 +- [ ] `POST /api/v1/properties/ingest` возвращает 202 +- [ ] Минимальный payload (title, url, images) создаёт property_listings +- [ ] Полный payload заполняет все поля + custom_fields +- [ ] Повторный ingest с тем же external_id → snapshot + update +- [ ] Недвижимость-объявление → reject, только raw_data со статусом invalid + +--- + +## Фаза 7: Тесты + +### Шаг 7.1: conftest.py +Фикстуры: +- `async_engine` — `create_async_engine("postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data_test")` +- `init_db(async_engine)` — `Base.metadata.create_all` +- `db_session(async_engine)` — async session с `begin()` + rollback +- `client(db_session)` — FastAPI `TestClient` с переопределённым `get_db` +- `mock_ollama_client()` — фиксированные ответы +- `sample_payload_minimal()` — только title, url, images +- `sample_payload_full()` — все поля +- `sample_payload_car()` — объявление про машину (для reject) + +### Шаг 7.2: Unit-тесты (`tests/unit/`) +- `test_ai_normalizer.py`: + - `test_reject_car_listing()` — `is_real_estate=False` + - `test_accept_apartment_from_messy_text()` — извлекает rooms, area, floor из каши + - `test_mock_mode_returns_fixed_json()` — при `OLLAMA_MOCK=true` +- `test_ai_image_analyzer.py`: + - `test_analyze_returns_structured_response()` + - `test_analyze_with_empty_image()` +- `test_ai_enricher.py`: + - `test_enrich_returns_quality_score_and_reliability()` + - `test_enrich_with_image_analysis()` +- `test_image_downloader.py`: + - `test_download_creates_file_and_returns_hash()` (mock httpx + tempfile) + - `test_dedup_returns_existing_hash()` +- `test_property_pipeline.py`: + - `test_new_listing_creates_all_records()` (моки всех репозиториев) + - `test_existing_listing_creates_snapshot()` (моки) + - `test_invalid_payload_returns_invalid_status()` + +### Шаг 7.3: Integration-тесты (`tests/integration/`) +- `test_api_ingest.py`: + - `test_ingest_minimal_payload_202()` — проверяет создание raw + listing + - `test_ingest_full_payload_populates_all_fields()` + - `test_ingest_duplicate_external_id_updates_and_creates_snapshot()` + - `test_ingest_car_payload_returns_invalid()` — проверяет, что listing не создан +- `test_db_pipeline.py`: + - `test_upsert_creates_snapshot_with_correct_data()` + - `test_custom_fields_deleted_on_update()` + - `test_images_deduplicated_by_hash()` + +### Шаг 7.4: Запуск тестов +```bash +pytest tests/unit -v -m unit +pytest tests/integration -v -m integration +pytest --cov=src --cov-report=term-missing +``` + +### Definition of Done фазы 7 +- [ ] ≥ 80% покрытие кода +- [ ] Все unit-тесты проходят +- [ ] Все integration-тесты проходят (требуют PostgreSQL) +- [ ] CI-ready: `pytest` проходит в чистом окружении + +--- + +## Фаза 8: Документация и финализация + +### Шаг 8.1: README.md +- Описание, стек, быстрый старт, API endpoint, структура проекта. + +### Шаг 8.2: docs/SPECIFICATION.md +- Полное ТЗ (уже написано, обновить при необходимости). + +### Шаг 8.3: docs/ARCHITECTURE.md +- Архитектура (уже написано, обновить при необходимости). + +### Шаг 8.4: Makefile (опционально) +```makefile +.PHONY: dev test lint migrate + +dev: + docker compose up -d + uvicorn vmk_data_collector.main:app --reload + +test: + pytest -v + +lint: + ruff check src tests + black --check src tests + mypy src + +migrate: + alembic upgrade head + +seed: + python -c "import asyncio; from vmk_data_collector.db.seed import seed_all; asyncio.run(seed_all())" +``` + +### Шаг 8.5: Dockerfile (опционально) +```dockerfile +FROM python:3.12-slim +WORKDIR /app +COPY pyproject.toml . +RUN pip install -e ".[dev]" +COPY . . +CMD ["uvicorn", "vmk_data_collector.main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +### Definition of Done фазы 8 +- [ ] `make dev` запускает сервис +- [ ] `make test` проходит +- [ ] `make lint` проходит +- [ ] README содержит всё для нового разработчика + +--- + +## Сводная таблица файлов + +| # | Путь | Фаза | Описание | +|---|------|------|----------| +| 1 | `pyproject.toml` | 1 | Зависимости, конфиги инструментов | +| 2 | `.env.example` | 1 | Шаблон env | +| 3 | `docker-compose.yml` | 1 | PostgreSQL | +| 4 | `.gitignore` | 0 | Исключения git | +| 5 | `src/vmk_data_collector/core/config.py` | 1 | Pydantic Settings | +| 6 | `src/vmk_data_collector/core/logging.py` | 1 | structlog | +| 7 | `src/vmk_data_collector/core/exceptions.py` | 1 | Исключения | +| 8 | `src/vmk_data_collector/db/base.py` | 2 | DeclarativeBase | +| 9 | `src/vmk_data_collector/domain/enums.py` | 2 | Все ENUM | +| 10 | `src/vmk_data_collector/models/data_source.py` | 2 | ORM DataSource | +| 11 | `src/vmk_data_collector/models/property_type.py` | 2 | ORM PropertyType | +| 12 | `src/vmk_data_collector/models/raw_parsing_data.py` | 2 | ORM RawParsingData | +| 13 | `src/vmk_data_collector/models/property_listing.py` | 2 | ORM PropertyListing | +| 14 | `src/vmk_data_collector/models/property_image.py` | 2 | ORM PropertyImage | +| 15 | `src/vmk_data_collector/models/property_custom_field.py` | 2 | ORM PropertyCustomField | +| 16 | `src/vmk_data_collector/models/property_snapshot.py` | 2 | ORM PropertySnapshot | +| 17 | `src/vmk_data_collector/models/ai_enrichment.py` | 2 | ORM AiEnrichment | +| 18 | `src/vmk_data_collector/models/__init__.py` | 2 | Импорты | +| 19 | `alembic/env.py` | 2 | Alembic async env | +| 20 | `src/vmk_data_collector/db/seed.py` | 2 | Seed справочников | +| 21 | `src/vmk_data_collector/db/engine.py` | 3 | AsyncEngine | +| 22 | `src/vmk_data_collector/db/session.py` | 3 | AsyncSessionLocal | +| 23 | `src/vmk_data_collector/db/repositories/base.py` | 3 | BaseRepository | +| 24 | `src/vmk_data_collector/db/repositories/raw_data.py` | 3 | RawDataRepository | +| 25 | `src/vmk_data_collector/db/repositories/property.py` | 3 | PropertyRepository | +| 26 | `src/vmk_data_collector/db/repositories/image.py` | 3 | ImageRepository | +| 27 | `src/vmk_data_collector/db/repositories/custom_field.py` | 3 | CustomFieldRepository | +| 28 | `src/vmk_data_collector/db/repositories/snapshot.py` | 3 | SnapshotRepository | +| 29 | `src/vmk_data_collector/db/repositories/ai_enrichment.py` | 3 | AiEnrichmentRepository | +| 30 | `src/vmk_data_collector/db/repositories/data_source.py` | 3 | DataSourceRepository | +| 31 | `src/vmk_data_collector/domain/entities.py` | 4 | Dataclass сущности | +| 32 | `src/vmk_data_collector/schemas/raw_data.py` | 4 | API схемы | +| 33 | `src/vmk_data_collector/schemas/ai_response.py` | 4 | AI схемы | +| 34 | `src/vmk_data_collector/schemas/normalized.py` | 4 | NormalizedPropertySchema | +| 35 | `src/vmk_data_collector/services/ollama_client.py` | 5 | HTTP клиент Ollama | +| 36 | `src/vmk_data_collector/services/ai_normalizer.py` | 5 | AiNormalizer | +| 37 | `src/vmk_data_collector/services/ai_image_analyzer.py` | 5 | AiImageAnalyzer | +| 38 | `src/vmk_data_collector/services/ai_enricher.py` | 5 | AiEnricher | +| 39 | `src/vmk_data_collector/services/image_downloader.py` | 5 | ImageDownloader | +| 40 | `src/vmk_data_collector/services/property_pipeline.py` | 6 | PropertyPipeline | +| 41 | `src/vmk_data_collector/api/deps.py` | 6 | DI-зависимости | +| 42 | `src/vmk_data_collector/api/v1/router_properties.py` | 6 | FastAPI router | +| 43 | `src/vmk_data_collector/main.py` | 6 | Точка входа | +| 44 | `tests/conftest.py` | 7 | Фикстуры | +| 45 | `tests/unit/test_ai_normalizer.py` | 7 | Unit тесты нормализатора | +| 46 | `tests/unit/test_ai_image_analyzer.py` | 7 | Unit тесты анализатора | +| 47 | `tests/unit/test_ai_enricher.py` | 7 | Unit тесты enricher | +| 48 | `tests/unit/test_image_downloader.py` | 7 | Unit тесты downloader | +| 49 | `tests/unit/test_property_pipeline.py` | 7 | Unit тесты pipeline | +| 50 | `tests/integration/test_api_ingest.py` | 7 | Integration API | +| 51 | `tests/integration/test_db_pipeline.py` | 7 | Integration DB | +| 52 | `docs/SPECIFICATION.md` | 8 | ТЗ | +| 53 | `docs/ARCHITECTURE.md` | 8 | Архитектура | +| 54 | `README.md` | 8 | README | +| 55 | `Makefile` | 8 | Команды | +| 56 | `Dockerfile` | 8 | Docker образ | + +--- + +## Граф зависимостей фаз + +``` +Фаза 0 (env) + │ + ▼ +Фаза 1 (config) ─────────────────────────┐ + │ │ + ▼ │ +Фаза 2 (models) ──► Фаза 3 (repos) ──► Фаза 4 (schemas) + │ │ │ + ▼ │ ▼ +Alembic migrate ◄────────────────────────┘ Фаза 5 (AI services) + │ + ▼ + Фаза 6 (Pipeline + API) + │ + ▼ + Фаза 7 (Tests) + │ + ▼ + Фаза 8 (Docs + Deploy) +``` + +**Можно параллельно**: Фаза 4 (schemas) и Фаза 5 (AI services) не зависят друг от друга. Оба зависят от Фазы 1. + +--- + +## Чеклист перед началом кодирования + +- [ ] Docker Compose для PostgreSQL поднят и работает +- [ ] `.env` скопирован из `.env.example` и настроен +- [ ] `pip install -e ".[dev]"` прошёл успешно +- [ ] `alembic` инициализирован, `alembic.ini` настроен на async +- [ ] Все 8 таблиц спроектированы и задокументированы в SPECIFICATION.md +- [ ] AI-схемы ответов (Normalizer, ImageAnalyzer, Enricher) согласованы +- [ ] Тестовая стратегия определена diff --git a/docs/SPECIFICATION.md b/docs/SPECIFICATION.md new file mode 100644 index 0000000..eefe9fc --- /dev/null +++ b/docs/SPECIFICATION.md @@ -0,0 +1,531 @@ +# VMK 360 Data Collector — Техническое задание + +## 1. Общее описание + +Сервис приёма, нормализации и ИИ-обогащения данных об объектах недвижимости. + +Парсеры отправляют полусырые данные через REST API. Сервис валидирует их с помощью локальной LLM (Ollama), приводит к единому формату, анализирует изображения, обогащает текст ИИ и сохраняет в PostgreSQL. При повторном приёме объявления (по `source_id` + `external_id`) создаётся снапшот старой версии и обновляется текущая. + +## 2. Домен: объекты недвижимости + +### 2.1 Типы объектов (property_types) +- Квартира +- Дом +- Таунхаус +- Коммерческая +- Земельный участок +- Гараж +- Офис +- Склад +- Торговая площадь +- Коттедж +- Комната +- Новостройка + +### 2.2 Типы сделок (deal_types) +- `sale` — продажа +- `rent_long` — долгосрочная аренда +- `rent_short` — посуточная аренда + +## 3. Модель данных + +### 3.1 Справочники + +| Таблица | Поля | Описание | +|---------|------|----------| +| **data_sources** | `id`, `slug` (UNIQUE), `name`, `url_pattern`, `description`, `created_at` | Источники парсеров | +| **property_types** | `id`, `slug` (UNIQUE), `name`, `description` | Типы объектов недвижимости | +| **deal_types** | `id`, `slug` (UNIQUE), `name` | Типы сделок | + +### 3.2 Основные сущности + +#### raw_parsing_data +Приёмник всех данных от парсеров. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `source_id` | FK → data_sources | | +| `external_id` | VARCHAR | ID от парсера | +| `payload` | JSONB | Абсолютно любая структура от парсера | +| `status` | ENUM | `pending`, `processing`, `completed`, `failed`, `invalid` | +| `validation_result` | VARCHAR | `valid`, `invalid`, `uncertain` | +| `error_message` | TEXT | nullable | +| `received_at` | TIMESTAMPTZ | | +| `processed_at` | TIMESTAMPTZ | nullable | + +Уникальность: `UNIQUE(source_id, external_id)` + +#### property_listings +Нормализованное объявление. Максимум типичных полей для недвижимости. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `raw_data_id` | FK → raw_parsing_data (UNIQUE) | | +| `source_id` | FK → data_sources | | +| `external_id` | VARCHAR | ID от парсера | +| `title` | VARCHAR | Оригинальное название | +| `description` | TEXT | Оригинальное описание | +| `generated_description` | TEXT | AI-описание | +| `deal_type` | ENUM | `sale`, `rent_long`, `rent_short` | +| `property_type_id` | FK → property_types | | +| `price` | NUMERIC(18,2) | | +| `currency` | VARCHAR(3) | UAH / USD / EUR | +| `original_price` | NUMERIC(18,2) | Цена в валюте источника | +| `original_currency` | VARCHAR(3) | | +| `price_per_sqm` | NUMERIC(18,2) | | +| `total_area` | NUMERIC(8,2) | Общая площадь, м² | +| `living_area` | NUMERIC(8,2) | Жилая площадь | +| `kitchen_area` | NUMERIC(8,2) | Площадь кухни | +| `land_area` | NUMERIC(10,2) | Площадь участка, соток/га | +| `rooms_count` | SMALLINT | | +| `bedrooms_count` | SMALLINT | | +| `bathrooms_count` | SMALLINT | | +| `layout` | VARCHAR | `studio`, `separate`, `adjacent` | +| `floor` | SMALLINT | | +| `floors_total` | SMALLINT | | +| `building_year` | SMALLINT | | +| `building_type` | VARCHAR | `brick`, `panel`, `monolith`, `gas_block`, `wood` | +| `renovation_status` | VARCHAR | `cosmetic`, `euro`, `designer`, `none`, `construction` | +| `ceiling_height` | NUMERIC(4,2) | В метрах | +| `material` | VARCHAR | Материал стен | +| `has_balcony` | BOOLEAN | | +| `has_loggia` | BOOLEAN | | +| `balcony_count` | SMALLINT | | +| `loggia_count` | SMALLINT | | +| `bathroom_type` | VARCHAR | `combined`, `separate`, `multiple` | +| `elevator_count` | SMALLINT | | +| `has_freight_elevator` | BOOLEAN | | +| `parking_type` | VARCHAR | `ground`, `underground`, `none`, `garage` | +| `heating_type` | VARCHAR | `central`, `autonomous`, `floor`, `none` | +| `internet` | BOOLEAN | | +| `security` | BOOLEAN | | +| `windows_direction` | VARCHAR | Направление окон | +| `window_view` | VARCHAR | `yard`, `street`, `park`, `water`, `forest` | +| `address_raw` | TEXT | Адрес как пришёл | +| `city` | VARCHAR | | +| `district` | VARCHAR | Район | +| `micro_district` | VARCHAR | Микрорайон | +| `street` | VARCHAR | Улица | +| `house_number` | VARCHAR | Номер дома | +| `metro_station` | VARCHAR | Станция метро | +| `metro_distance_min` | SMALLINT | Минут до метро | +| `metro_distance_type` | VARCHAR | `walk`, `transport` | +| `latitude` | NUMERIC(10,7) | | +| `longitude` | NUMERIC(10,7) | | +| `contact_phone` | VARCHAR | | +| `contact_name` | VARCHAR | | +| `contact_email` | VARCHAR | | +| `is_agent` | BOOLEAN | | +| `agency_name` | VARCHAR | | +| `publish_date` | TIMESTAMPTZ | | +| `url_source` | TEXT | URL объявления | +| `listing_status` | ENUM | `active`, `sold`, `rented`, `removed`, `archived` | +| `images_count` | SMALLINT | | +| `listing_quality_score` | SMALLINT | AI-оценка качества объявления, 1–10 | +| `reliability_rating` | SMALLINT | AI-оценка надёжности, 1–5 | +| `sentiment_score` | NUMERIC(3,2) | -1..1 | +| `created_at` | TIMESTAMPTZ | | +| `updated_at` | TIMESTAMPTZ | | + +Уникальность: `UNIQUE(source_id, external_id)` + +#### property_images +Картинки объявления. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `property_id` | FK → property_listings | | +| `url` | TEXT | Исходный URL | +| `local_path` | VARCHAR | Путь к файлу на диске | +| `hash` | VARCHAR(64) | SHA-256 для дедупликации | +| `file_size` | INTEGER | Байты | +| `width` | SMALLINT | | +| `height` | SMALLINT | | +| `download_status` | ENUM | `pending`, `downloaded`, `failed` | +| `ai_description` | TEXT | AI-описание фото | +| `analysis_status` | ENUM | `pending`, `completed`, `failed` | +| `order_index` | SMALLINT | Порядок в галерее | + +#### property_custom_fields +Универсальные кастомные атрибуты. Всё, что не вошло в типичные поля `property_listings`. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `property_id` | FK → property_listings | | +| `field_name` | VARCHAR | Ключ | +| `field_value` | TEXT | Значение | +| `field_type` | VARCHAR | `str`, `int`, `float`, `bool`, `date`, `json` | + +Уникальность: `UNIQUE(property_id, field_name)` + +#### property_snapshots +История обновлений объявлений. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `property_id` | FK → property_listings | | +| `snapshot_data` | JSONB | Полная копия listing + custom_fields | +| `changed_fields` | JSONB | Diff (только изменённые поля) | +| `created_at` | TIMESTAMPTZ | | + +#### ai_enrichments +Результат ИИ-анализа. + +| Поле | Тип | Описание | +|------|-----|----------| +| `id` | SERIAL PK | | +| `property_id` | FK → property_listings (UNIQUE) | | +| `extracted_features` | JSONB | Доп. параметры из текста | +| `price_assessment` | JSONB | `{ market_estimate, deviation_percent, confidence, comment }` | +| `listing_quality_score` | SMALLINT | 1–10 | +| `reliability_rating` | SMALLINT | 1–5 | +| `sentiment_score` | NUMERIC(3,2) | -1..1 | +| `classification` | VARCHAR | AI-классификация объявления | +| `image_analysis_results` | JSONB | Агрегированный анализ фото | +| `generated_description` | TEXT | AI-сгенерированное описание | +| `summary` | TEXT | Краткое резюме | +| `model_version` | VARCHAR | Версия/модель LLM | +| `processing_time_ms` | INTEGER | | +| `created_at` | TIMESTAMPTZ | | + +### 3.3 Связи + +``` +raw_parsing_data 1:1 property_listings +property_listings 1:N property_images +property_listings 1:N property_custom_fields +property_listings 1:N property_snapshots +property_listings 1:1 ai_enrichments +property_listings N:1 property_types +property_listings N:1 data_sources +``` + +## 4. AI-слой (Ollama) + +### 4.1 Конфигурация + +```bash +OLLAMA_BASE_URL=http://localhost:11434 +OLLAMA_TEXT_MODEL=llama3.2 +OLLAMA_VISION_MODEL=llava +OLLAMA_TIMEOUT=120 +OLLAMA_MOCK=false +IMAGE_STORAGE_PATH=/var/lib/vmk/images +``` + +### 4.2 AiNormalizer (валидация + структуризация) + +**Задача**: определить, является ли payload объявлением о недвижимости. Если нет — вернуть `is_real_estate: false`. Если да — вернуть нормализованную структуру. + +**Границы валидации**: +- **Reject**: автомобили, телефоны, вакансии, мебель, бытовая техника, животные. +- **Accept**: квартиры, дома, гаражи, земля, офисы, склады, торговые площади. +- **Uncertain**: стройматериалы, услуги ремонта — требуют внимания, но пока accept. + +**Поведение при reject**: +- `raw_parsing_data.status = invalid` +- `validation_result = invalid` +- Ответ API: `202 { job_id, status: "invalid", reason: "..." }` +- НИЧЕГО не создаётся в `property_listings`. + +**Структурированный выход** (JSON schema в prompt): +```json +{ + "is_real_estate": true, + "property_type": "apartment", + "deal_type": "sale", + "title": "...", + "description": "...", + "price": 12200000, + "currency": "RUB", + "total_area": 78.5, + "rooms_count": 3, + "floor": 12, + "floors_total": 16, + "building_year": 2019, + "address": "г. Москва, ул. Ленина, 15", + "city": "Москва", + "metro_station": "Таганская", + "metro_distance_min": 8, + "contact_phone": "+7...", + "contact_name": "...", + "is_agent": false, + "images": ["https://..."], + "custom_fields": { + "balcony_type": "лоджия", + "ceiling_height": 2.85 + } +} +``` + +### 4.3 AiImageAnalyzer + +**Задача**: анализ каждого изображения объекта. + +**Вход**: base64 картинка. +**Выход**: +```json +{ + "overall_condition": "хороший ремонт, современный стиль", + "rooms_observed": 3, + "issues_found": ["пятно на потолке"], + "positive_highlights": ["панорамные окна", "встроенная кухня"], + "view_from_window": "двор, детская площадка", + "furniture_included": true, + "appliances_included": ["холодильник", "стиральная машина"] +} +``` + +**Процесс**: +1. Скачивание картинок по URL (asyncio.gather). +2. Сохранение в `IMAGE_STORAGE_PATH / {property_id} / {hash}.ext`. +3. Дедупликация по SHA-256. +4. Извлечение width/height (Pillow). +5. Vision-анализ через Ollama (`llava` / `llama3.2-vision`). + +### 4.4 AiEnricher (финальное обогащение) + +**Задача**: анализ текста + результатов image_analysis. + +**Вход**: +- Нормализованный текст (title, description, custom_fields) +- Агрегированные `image_analysis_results` + +**Выход**: +```json +{ + "extracted_features": { + "property_complex": "ЖК Солнечный", + "developer": "Группа Самолет", + "material": "монолит", + "additional_params": { "кондиционер": true } + }, + "price_assessment": { + "market_estimate": 12500000, + "deviation_percent": -3.5, + "confidence": 0.82, + "comment": "Цена соответствует рынку" + }, + "listing_quality_score": 8, + "reliability_rating": 4, + "sentiment_score": 0.6, + "classification": "secondary_sale_apartment", + "generated_description": "Продаётся уютная 3-комнатная квартира...", + "summary": "3-комнатная квартира, 78 м², ЖК Солнечный, цена 12.2 млн", + "language": "ru" +} +``` + +### 4.5 Ollama клиент + +- **Текст**: POST `/api/chat` с JSON mode (system prompt: «Ответь ТОЛЬКО JSON»). +- **Vision**: `/api/chat` с `images: [base64...]`. +- **Retries**: exponential backoff (2^n сек), max 5 попыток. +- **Timeout**: 60–120 секунд. +- **Circuit breaker**: при 5 ошибок подряд — fallback на mock или queue. +- **Mock режим**: при `OLLAMA_MOCK=true` возвращается фиксированный JSON без реального вызова. + +## 5. Пайплайн обработки (PropertyPipeline) + +``` +Parser JSON + │ + ▼ +POST /api/v1/properties/ingest + │ + ▼ +Save raw_parsing_data (status = pending) + │ + ▼ +AiNormalizer (Ollama text model) + │ + ├──► "Это не недвижимость" + │ └──► raw_data.status = invalid + │ Response: 202 { job_id, status: invalid } + │ + └──► "Это недвижимость" + │ + ▼ + Structured JSON от Ollama + │ + ▼ + Check: source_id + external_id exists? + │ + ├──► Нет → CREATE property_listings + │ + └──► Да → UPDATE + │ + ├──► Копируем текущее → property_snapshots + ├──► Обновляем property_listings + └──► Удаляем старые custom_fields, пересоздаём + │ + ▼ + Download images (asyncio.gather) + URL → local dir (IMAGE_STORAGE_PATH) → SHA-256 → property_images + │ + ▼ + AiImageAnalyzer (Ollama vision model) + Каждое фото → описание, ремонт, комнаты, проблемы + │ + ▼ + AiEnricher (Ollama text model) + Текст + image_analysis → summary, price_assessment, + quality_score, reliability_rating, generated_description + │ + ▼ + Commit всё в одной транзакции + raw_data.status = completed + │ + ▼ + Response 202 { job_id, property_id, status: completed } +``` + +### 5.1 Повторные объявления (Upsert) + +При совпадении `source_id + external_id`: +1. Создаём снапшот текущей версии в `property_snapshots`. +2. Обновляем `property_listings`. +3. Удаляем старые `property_custom_fields`, создаём новые. +4. Обрабатываем изображения: новые добавляем, существующие по hash пропускаем. +5. Пересоздаём `ai_enrichments`. +6. Обновляем `raw_parsing_data.status = completed`. + +## 6. API + +### POST /api/v1/properties/ingest + +**Гибкий формат входа** (парсеры гарантированно шлют): +```json +{ + "source_slug": "olx_parser", + "external_id": "olx_987654", + "payload": { + "title": "Продам 2-комнатную квартиру", + "url": "https://olx.ua/...", + "images": ["https://..."], + "published_at": "2024-06-01T10:00:00Z" + } +} +``` + +**Опционально могут шлют**: +- `payload.description` — текст с кашей параметров +- `payload.price` + `payload.currency` +- `payload.contacts` — phone, name +- `payload.address` — строка +- `payload.params` — разнобой ключей от парсера + +**Response (invalid / rejected)**: +```json +{ + "job_id": 1, + "property_id": null, + "status": "invalid", + "reason": "Payload appears to be a car listing, not real estate", + "message": "Data rejected: not a real estate listing" +} +``` + +**Response (new listing created)**: +```json +{ + "job_id": 1, + "property_id": 42, + "status": "completed", + "message": "New property listing created and enriched" +} +``` + +**Response (existing listing updated)**: +```json +{ + "job_id": 1, + "property_id": 42, + "status": "completed", + "message": "Existing property listing updated, snapshot saved", + "snapshot_id": 15 +} +``` + +## 7. Тесты + +### Unit-тесты +- `test_ai_normalizer.py` — валидный reject (авто), валидный accept (квартира), распознавание из каши. +- `test_ai_image_analyzer.py` — мок Ollama vision. +- `test_ai_enricher.py` — мок Ollama text. +- `test_upsert_pipeline.py` — snapshot создан при обновлении. +- `test_image_downloader.py` — скачивание, hash, дедупликация. + +### Integration-тесты +- `test_api_ingest.py` — FastAPI TestClient. +- `test_end_to_end.py` — минимальный payload → проверка создания raw + listing + images + custom_fields + enrichment. +- `test_invalid_reject.py` — payload про авто → только raw со статусом invalid. +- `test_upsert_integration.py` — duplicate external_id → snapshot + update. + +### Фикстуры +- `async_engine` — test DB engine. +- `db_session` — async session с rollback. +- `client` — FastAPI TestClient. +- `mock_ollama_client` — фиксированные JSON-ответы. +- `sample_payloads` — примеры от разных парсеров (olx, avito, domria). + +## 8. Переменные окружения (.env) + +```bash +# Database +DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data +DATABASE_POOL_SIZE=20 +DATABASE_MAX_OVERFLOW=10 +DATABASE_ECHO=false + +# Application +APP_HOST=0.0.0.0 +APP_PORT=8000 +LOG_LEVEL=info +DEBUG=false + +# Ollama +OLLAMA_BASE_URL=http://localhost:11434 +OLLAMA_TEXT_MODEL=llama3.2 +OLLAMA_VISION_MODEL=llava +OLLAMA_TIMEOUT=120 +OLLAMA_MOCK=false + +# Storage +IMAGE_STORAGE_PATH=/var/lib/vmk/images + +# Feature flags +ENABLE_IMAGE_ANALYSIS=true +ENABLE_PRICE_ESTIMATION=true +``` + +## 9. Технологический стек + +- **Python 3.12+** +- **FastAPI** — REST API +- **PostgreSQL** + **asyncpg** — БД +- **SQLAlchemy 2.x** (async) + **Alembic** — ORM и миграции +- **Pydantic** — валидация и Settings +- **httpx** — HTTP клиент (Ollama API + скачивание картинок) +- **Pillow** — обработка изображений (width/height) +- **tenacity** — retries +- **structlog** — логирование +- **pytest + pytest-asyncio** — тестирование +- **Docker Compose** — PostgreSQL и Ollama + +## 10. Порядок реализации + +| Фаза | Что делаем | +|------|-----------| +| **1. Фундамент** | `pyproject.toml`, `.env.example`, `docker-compose.yml`, директории | +| **2. Модели + миграции** | SQLAlchemy модели (8 таблиц), Alembic initial migration, seed property_types | +| **3. Инфраструктура БД** | async engine, session, репозитории | +| **4. AI слой** | Ollama HTTP client, AiNormalizer, AiImageAnalyzer, AiEnricher | +| **5. Pipeline** | Image downloader, PropertyPipeline (upsert + snapshots), FastAPI router | +| **6. Тесты** | unit + integration | +| **7. Документация** | `SPECIFICATION.md`, `ARCHITECTURE.md`, `README.md` | diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d7fd2f3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,65 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "vmk-data-collector" +version = "0.1.0" +description = "VMK 360 Real Estate Data Collector" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.111.0", + "uvicorn[standard]>=0.30.0", + "sqlalchemy[asyncio]>=2.0.0", + "asyncpg>=0.29.0", + "alembic>=1.13.0", + "pydantic>=2.7.0", + "pydantic-settings>=2.2.0", + "httpx>=0.27.0", + "structlog>=24.1.0", + "python-dotenv>=1.0.0", + "tenacity>=8.3.0", + "pillow>=10.3.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.2.0", + "pytest-asyncio>=0.23.0", + "httpx>=0.27.0", + "factory-boy>=3.3.0", + "faker>=25.0.0", + "ruff>=0.4.0", + "black>=24.4.0", + "mypy>=1.10.0", + "coverage>=7.5.0", +] + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Slow tests", +] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.ruff] +target-version = "py312" +lint.select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"] +lint.ignore = ["B008"] + +[tool.mypy] +python_version = "3.12" +disallow_untyped_defs = true +warn_return_any = true +warn_unused_configs = true +ignore_missing_imports = true diff --git a/src/vmk_data_collector/__init__.py b/src/vmk_data_collector/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/__init__.py diff --git a/src/vmk_data_collector/api/__init__.py b/src/vmk_data_collector/api/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/api/__init__.py diff --git a/src/vmk_data_collector/api/deps.py b/src/vmk_data_collector/api/deps.py new file mode 100644 index 0000000..117b9ba --- /dev/null +++ b/src/vmk_data_collector/api/deps.py @@ -0,0 +1,68 @@ +from collections.abc import AsyncGenerator + +from fastapi import Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.core.config import settings +from vmk_data_collector.db.repositories.ai_enrichment import ( + AiEnrichmentRepository, +) +from vmk_data_collector.db.repositories.custom_field import ( + CustomFieldRepository, +) +from vmk_data_collector.db.repositories.data_source import ( + DataSourceRepository, +) +from vmk_data_collector.db.repositories.image import ImageRepository +from vmk_data_collector.db.repositories.property import PropertyRepository +from vmk_data_collector.db.repositories.property_type import ( + PropertyTypeRepository, +) +from vmk_data_collector.db.repositories.raw_data import RawDataRepository +from vmk_data_collector.db.repositories.snapshot import SnapshotRepository +from vmk_data_collector.db.session import AsyncSessionLocal +from vmk_data_collector.services.ai_enricher import AiEnricher +from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer +from vmk_data_collector.services.ai_normalizer import AiNormalizer +from vmk_data_collector.services.image_downloader import ImageDownloader +from vmk_data_collector.services.ollama_client import OllamaClient +from vmk_data_collector.services.property_pipeline import PropertyPipeline + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + async with AsyncSessionLocal() as session: + yield session + + +def get_ollama_client() -> OllamaClient: + return OllamaClient( + base_url=settings.ollama_base_url, + timeout=settings.ollama_timeout, + ) + + +async def get_property_pipeline( + db: AsyncSession = Depends(get_db), +) -> PropertyPipeline: + client = get_ollama_client() + normalizer = AiNormalizer(client=client) + image_analyzer = AiImageAnalyzer(client=client) + enricher = AiEnricher(client=client) + downloader = ImageDownloader( + storage_path=settings.image_storage_path_abs + ) + + return PropertyPipeline( + raw_repo=RawDataRepository(db), + property_repo=PropertyRepository(db), + image_repo=ImageRepository(db), + custom_field_repo=CustomFieldRepository(db), + snapshot_repo=SnapshotRepository(db), + enrichment_repo=AiEnrichmentRepository(db), + data_source_repo=DataSourceRepository(db), + property_type_repo=PropertyTypeRepository(db), + normalizer=normalizer, + image_downloader=downloader, + image_analyzer=image_analyzer, + enricher=enricher, + ) diff --git a/src/vmk_data_collector/api/v1/__init__.py b/src/vmk_data_collector/api/v1/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/api/v1/__init__.py diff --git a/src/vmk_data_collector/api/v1/router_properties.py b/src/vmk_data_collector/api/v1/router_properties.py new file mode 100644 index 0000000..9cdbe59 --- /dev/null +++ b/src/vmk_data_collector/api/v1/router_properties.py @@ -0,0 +1,29 @@ +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.api.deps import get_db, get_property_pipeline +from vmk_data_collector.db.repositories.raw_data import RawDataRepository +from vmk_data_collector.schemas.raw_data import ( + IngestResponse, + RawDataIngestRequest, +) +from vmk_data_collector.services.property_pipeline import PropertyPipeline + +router = APIRouter() + + +@router.post("/ingest", response_model=IngestResponse, status_code=202) +async def ingest_property( + request: RawDataIngestRequest, + db: AsyncSession = Depends(get_db), + pipeline: PropertyPipeline = Depends(get_property_pipeline), +) -> IngestResponse: + raw_repo = RawDataRepository(db) + raw = await raw_repo.create( + source_id=None, + external_id=request.external_id, + payload={**request.payload, "source_slug": request.source_slug}, + ) + response = await pipeline.process(raw.id) + await db.commit() + return response diff --git a/src/vmk_data_collector/core/__init__.py b/src/vmk_data_collector/core/__init__.py new file mode 100644 index 0000000..04840d0 --- /dev/null +++ b/src/vmk_data_collector/core/__init__.py @@ -0,0 +1,20 @@ +from .config import Settings, settings +from .exceptions import ( + AIProcessingError, + AppError, + DatabaseError, + NotRealEstateError, + ValidationError, +) +from .logging import configure_logging + +__all__ = [ + "Settings", + "settings", + "AppError", + "AIProcessingError", + "DatabaseError", + "NotRealEstateError", + "ValidationError", + "configure_logging", +] diff --git a/src/vmk_data_collector/core/config.py b/src/vmk_data_collector/core/config.py new file mode 100644 index 0000000..b73f374 --- /dev/null +++ b/src/vmk_data_collector/core/config.py @@ -0,0 +1,50 @@ +from pathlib import Path + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + ) + + # Database + database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data" + database_pool_size: int = 20 + database_max_overflow: int = 10 + database_echo: bool = False + + # App + app_host: str = "0.0.0.0" + app_port: int = 8000 + log_level: str = "info" + debug: bool = False + + # Ollama + ollama_base_url: str = "http://localhost:11434" + ollama_text_model: str = "llama3.2" + ollama_vision_model: str = "llava" + ollama_timeout: int = 120 + ollama_mock: bool = False + + # Images + image_storage_path: str = "/var/lib/vmk/images" + enable_image_analysis: bool = True + enable_price_estimation: bool = True + + @property + def database_url_async(self) -> str: + if "+asyncpg" not in self.database_url: + return self.database_url.replace( + "postgresql://", "postgresql+asyncpg://", 1 + ) + return self.database_url + + @property + def image_storage_path_abs(self) -> Path: + return Path(self.image_storage_path).resolve() + + +settings = Settings() diff --git a/src/vmk_data_collector/core/exceptions.py b/src/vmk_data_collector/core/exceptions.py new file mode 100644 index 0000000..25d8245 --- /dev/null +++ b/src/vmk_data_collector/core/exceptions.py @@ -0,0 +1,34 @@ +class AppError(Exception): + """Base application exception.""" + + def __init__(self, message: str = "An application error occurred") -> None: + self.message = message + super().__init__(self.message) + + +class ValidationError(AppError): + """Raised when input data fails validation.""" + + def __init__(self, message: str = "Validation error") -> None: + super().__init__(message) + + +class AIProcessingError(AppError): + """Raised when AI processing fails.""" + + def __init__(self, message: str = "AI processing error") -> None: + super().__init__(message) + + +class NotRealEstateError(ValidationError): + """Raised when parsed data is not real estate.""" + + def __init__(self, message: str = "Provided data is not real estate") -> None: + super().__init__(message) + + +class DatabaseError(AppError): + """Raised on database operation failure.""" + + def __init__(self, message: str = "Database error") -> None: + super().__init__(message) diff --git a/src/vmk_data_collector/core/logging.py b/src/vmk_data_collector/core/logging.py new file mode 100644 index 0000000..54658e2 --- /dev/null +++ b/src/vmk_data_collector/core/logging.py @@ -0,0 +1,24 @@ +import structlog + + +def configure_logging(log_level: str, debug: bool = False) -> None: + shared_processors = [ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.processors.format_exc_info, + structlog.processors.TimeStamper(fmt="iso"), + ] + + if debug: + renderer = structlog.dev.ConsoleRenderer(colors=True) + else: + renderer = structlog.processors.JSONRenderer() + + structlog.configure( + processors=shared_processors + [renderer], + wrapper_class=structlog.stdlib.BoundLogger, + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) diff --git a/src/vmk_data_collector/db/__init__.py b/src/vmk_data_collector/db/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/db/__init__.py diff --git a/src/vmk_data_collector/db/base.py b/src/vmk_data_collector/db/base.py new file mode 100644 index 0000000..fa2b68a --- /dev/null +++ b/src/vmk_data_collector/db/base.py @@ -0,0 +1,5 @@ +from sqlalchemy.orm import DeclarativeBase + + +class Base(DeclarativeBase): + pass diff --git a/src/vmk_data_collector/db/engine.py b/src/vmk_data_collector/db/engine.py new file mode 100644 index 0000000..eb37bde --- /dev/null +++ b/src/vmk_data_collector/db/engine.py @@ -0,0 +1,10 @@ +from sqlalchemy.ext.asyncio import create_async_engine + +from vmk_data_collector.core.config import settings + +engine = create_async_engine( + settings.database_url_async, + pool_size=settings.database_pool_size, + max_overflow=settings.database_max_overflow, + echo=settings.database_echo, +) diff --git a/src/vmk_data_collector/db/repositories/__init__.py b/src/vmk_data_collector/db/repositories/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/__init__.py diff --git a/src/vmk_data_collector/db/repositories/ai_enrichment.py b/src/vmk_data_collector/db/repositories/ai_enrichment.py new file mode 100644 index 0000000..5680c53 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/ai_enrichment.py @@ -0,0 +1,19 @@ +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.models.ai_enrichment import AiEnrichment + + +class AiEnrichmentRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create(self, property_id: int, **kwargs) -> AiEnrichment: + obj = AiEnrichment(property_id=property_id, **kwargs) + return await self.add(obj) + + async def delete_by_property(self, property_id: int) -> None: + await self._session.execute( + delete(AiEnrichment).where(AiEnrichment.property_id == property_id) + ) diff --git a/src/vmk_data_collector/db/repositories/base.py b/src/vmk_data_collector/db/repositories/base.py new file mode 100644 index 0000000..af59399 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/base.py @@ -0,0 +1,21 @@ +from typing import TypeVar + +from sqlalchemy.ext.asyncio import AsyncSession + +T = TypeVar("T") + + +class BaseRepository: + def __init__(self, session: AsyncSession) -> None: + self._session = session + + async def add(self, obj: T) -> T: + self._session.add(obj) + await self._session.flush() + return obj + + async def delete(self, obj: T) -> None: + await self._session.delete(obj) + + async def commit(self) -> None: + await self._session.commit() diff --git a/src/vmk_data_collector/db/repositories/custom_field.py b/src/vmk_data_collector/db/repositories/custom_field.py new file mode 100644 index 0000000..e6bc185 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/custom_field.py @@ -0,0 +1,43 @@ +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.domain.enums import CustomFieldType +from vmk_data_collector.models.property_custom_field import PropertyCustomField + + +class CustomFieldRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create( + self, + property_id: int, + field_name: str, + field_value: str, + field_type: CustomFieldType = CustomFieldType.str, + ) -> PropertyCustomField: + obj = PropertyCustomField( + property_id=property_id, + field_name=field_name, + field_value=field_value, + field_type=field_type, + ) + return await self.add(obj) + + async def bulk_create(self, property_id: int, fields: list[dict]) -> None: + for field in fields: + obj = PropertyCustomField( + property_id=property_id, + field_name=field["field_name"], + field_value=field["field_value"], + field_type=CustomFieldType(field.get("field_type", "str")), + ) + self._session.add(obj) + + async def delete_by_property(self, property_id: int) -> None: + await self._session.execute( + delete(PropertyCustomField).where( + PropertyCustomField.property_id == property_id + ) + ) diff --git a/src/vmk_data_collector/db/repositories/data_source.py b/src/vmk_data_collector/db/repositories/data_source.py new file mode 100644 index 0000000..0c69200 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/data_source.py @@ -0,0 +1,22 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.models.data_source import DataSource + + +class DataSourceRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def get_or_create_by_slug( + self, slug: str, name: str | None = None + ) -> DataSource: + result = await self._session.execute( + select(DataSource).where(DataSource.slug == slug) + ) + existing = result.scalar_one_or_none() + if existing: + return existing + obj = DataSource(slug=slug, name=name or slug) + return await self.add(obj) diff --git a/src/vmk_data_collector/db/repositories/image.py b/src/vmk_data_collector/db/repositories/image.py new file mode 100644 index 0000000..c9179c4 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/image.py @@ -0,0 +1,70 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.domain.enums import ImageAnalysisStatus, ImageDownloadStatus +from vmk_data_collector.models.property_image import PropertyImage + + +class ImageRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create( + self, property_id: int, url: str, order_index: int = 0 + ) -> PropertyImage: + obj = PropertyImage( + property_id=property_id, + url=url, + order_index=order_index, + download_status=ImageDownloadStatus.pending, + analysis_status=ImageAnalysisStatus.pending, + ) + return await self.add(obj) + + async def get_by_property(self, property_id: int) -> list[PropertyImage]: + result = await self._session.execute( + select(PropertyImage).where(PropertyImage.property_id == property_id) + ) + return list(result.scalars().all()) + + async def get_by_hash( + self, property_id: int, image_hash: str + ) -> PropertyImage | None: + result = await self._session.execute( + select(PropertyImage).where( + PropertyImage.property_id == property_id, + PropertyImage.hash == image_hash, + ) + ) + return result.scalar_one_or_none() + + async def update_downloaded( + self, + image_id: int, + local_path: str, + file_size: int, + width: int, + height: int, + image_hash: str, + ) -> None: + result = await self._session.execute( + select(PropertyImage).where(PropertyImage.id == image_id) + ) + obj = result.scalar_one_or_none() + if obj: + obj.local_path = local_path + obj.file_size = file_size + obj.width = width + obj.height = height + obj.hash = image_hash + obj.download_status = ImageDownloadStatus.downloaded + + async def update_analysis(self, image_id: int, ai_description: str) -> None: + result = await self._session.execute( + select(PropertyImage).where(PropertyImage.id == image_id) + ) + obj = result.scalar_one_or_none() + if obj: + obj.ai_description = ai_description + obj.analysis_status = ImageAnalysisStatus.completed diff --git a/src/vmk_data_collector/db/repositories/property.py b/src/vmk_data_collector/db/repositories/property.py new file mode 100644 index 0000000..17abee1 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/property.py @@ -0,0 +1,51 @@ +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.models.property_custom_field import PropertyCustomField +from vmk_data_collector.models.property_listing import PropertyListing + + +class PropertyRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create(self, **kwargs) -> PropertyListing: + obj = PropertyListing(**kwargs) + return await self.add(obj) + + async def get_by_id(self, property_id: int) -> PropertyListing | None: + result = await self._session.execute( + select(PropertyListing).where(PropertyListing.id == property_id) + ) + return result.scalar_one_or_none() + + async def get_by_source_and_external( + self, source_id: int, external_id: str + ) -> PropertyListing | None: + result = await self._session.execute( + select(PropertyListing).where( + PropertyListing.source_id == source_id, + PropertyListing.external_id == external_id, + ) + ) + return result.scalar_one_or_none() + + async def update(self, property_id: int, **kwargs) -> PropertyListing: + result = await self._session.execute( + select(PropertyListing).where(PropertyListing.id == property_id) + ) + obj = result.scalar_one_or_none() + if obj is None: + raise ValueError(f"PropertyListing {property_id} not found") + for key, value in kwargs.items(): + if hasattr(obj, key): + setattr(obj, key, value) + return obj + + async def delete_custom_fields(self, property_id: int) -> None: + await self._session.execute( + delete(PropertyCustomField).where( + PropertyCustomField.property_id == property_id + ) + ) diff --git a/src/vmk_data_collector/db/repositories/property_type.py b/src/vmk_data_collector/db/repositories/property_type.py new file mode 100644 index 0000000..e130219 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/property_type.py @@ -0,0 +1,22 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.models.property_type import PropertyType + + +class PropertyTypeRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def get_or_create_by_slug( + self, slug: str, name: str | None = None + ) -> PropertyType: + result = await self._session.execute( + select(PropertyType).where(PropertyType.slug == slug) + ) + existing = result.scalar_one_or_none() + if existing: + return existing + obj = PropertyType(slug=slug, name=name or slug) + return await self.add(obj) diff --git a/src/vmk_data_collector/db/repositories/raw_data.py b/src/vmk_data_collector/db/repositories/raw_data.py new file mode 100644 index 0000000..57cf48e --- /dev/null +++ b/src/vmk_data_collector/db/repositories/raw_data.py @@ -0,0 +1,70 @@ +from datetime import UTC + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.domain.enums import RawDataStatus +from vmk_data_collector.models.raw_parsing_data import RawParsingData + + +class RawDataRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create( + self, + source_id: int | None, + external_id: str | None, + payload: dict, + ) -> RawParsingData: + obj = RawParsingData( + source_id=source_id, + external_id=external_id, + payload=payload, + status=RawDataStatus.pending, + ) + return await self.add(obj) + + async def get_by_id(self, raw_data_id: int) -> RawParsingData | None: + result = await self._session.execute( + select(RawParsingData).where(RawParsingData.id == raw_data_id) + ) + return result.scalar_one_or_none() + + async def get_by_source_and_external( + self, source_id: int, external_id: str + ) -> RawParsingData | None: + result = await self._session.execute( + select(RawParsingData).where( + RawParsingData.source_id == source_id, + RawParsingData.external_id == external_id, + ) + ) + return result.scalar_one_or_none() + + async def update_status( + self, + raw_data_id: int, + status: RawDataStatus, + error_message: str | None = None, + ) -> None: + result = await self._session.execute( + select(RawParsingData).where(RawParsingData.id == raw_data_id) + ) + obj = result.scalar_one_or_none() + if obj: + obj.status = status + if error_message is not None: + obj.error_message = error_message + + async def set_processed(self, raw_data_id: int) -> None: + from datetime import datetime + + result = await self._session.execute( + select(RawParsingData).where(RawParsingData.id == raw_data_id) + ) + obj = result.scalar_one_or_none() + if obj: + obj.status = RawDataStatus.completed + obj.processed_at = datetime.now(UTC) diff --git a/src/vmk_data_collector/db/repositories/snapshot.py b/src/vmk_data_collector/db/repositories/snapshot.py new file mode 100644 index 0000000..3707384 --- /dev/null +++ b/src/vmk_data_collector/db/repositories/snapshot.py @@ -0,0 +1,19 @@ +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.db.repositories.base import BaseRepository +from vmk_data_collector.models.property_snapshot import PropertySnapshot + + +class SnapshotRepository(BaseRepository): + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def create( + self, property_id: int, snapshot_data: dict, changed_fields: dict + ) -> PropertySnapshot: + obj = PropertySnapshot( + property_id=property_id, + snapshot_data=snapshot_data, + changed_fields=changed_fields, + ) + return await self.add(obj) diff --git a/src/vmk_data_collector/db/seed.py b/src/vmk_data_collector/db/seed.py new file mode 100644 index 0000000..a7029f1 --- /dev/null +++ b/src/vmk_data_collector/db/seed.py @@ -0,0 +1,64 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from vmk_data_collector.models.property_type import PropertyType + +PROPERTY_TYPES = [ + { + "slug": "apartment", + "name": "Квартира", + "description": "Квартира в многоквартирном доме", + }, + {"slug": "house", "name": "Дом", "description": "Частный дом"}, + {"slug": "townhouse", "name": "Таунхаус", "description": "Таунхаус"}, + { + "slug": "commercial", + "name": "Коммерческая недвижимость", + "description": "Коммерческая недвижимость", + }, + { + "slug": "land", + "name": "Земельный участок", + "description": "Земельный участок", + }, + { + "slug": "garage", + "name": "Гараж", + "description": "Гараж или парковочное место", + }, + {"slug": "office", "name": "Офис", "description": "Офисное помещение"}, + {"slug": "warehouse", "name": "Склад", "description": "Складское помещение"}, + { + "slug": "retail", + "name": "Торговая площадь", + "description": "Торговое помещение", + }, + {"slug": "cottage", "name": "Коттедж", "description": "Коттедж"}, + {"slug": "room", "name": "Комната", "description": "Отдельная комната"}, + { + "slug": "new_building", + "name": "Новострой", + "description": "Квартира в новострое", + }, +] + +DEAL_TYPES = [ + {"slug": "sale", "name": "Продажа"}, + {"slug": "rent_long", "name": "Долгосрочная аренда"}, + {"slug": "rent_short", "name": "Краткосрочная аренда"}, +] + + +async def seed_property_types(session: AsyncSession) -> None: + for pt in PROPERTY_TYPES: + result = await session.execute( + select(PropertyType).where(PropertyType.slug == pt["slug"]) + ) + existing = result.scalar_one_or_none() + if not existing: + session.add(PropertyType(**pt)) + await session.commit() + + +async def seed_all(session: AsyncSession) -> None: + await seed_property_types(session) diff --git a/src/vmk_data_collector/db/session.py b/src/vmk_data_collector/db/session.py new file mode 100644 index 0000000..1c1e6f6 --- /dev/null +++ b/src/vmk_data_collector/db/session.py @@ -0,0 +1,12 @@ +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from .engine import engine + +AsyncSessionLocal = async_sessionmaker( + engine, class_=AsyncSession, expire_on_commit=False +) + + +async def get_db_session() -> AsyncSession: + async with AsyncSessionLocal() as session: + yield session diff --git a/src/vmk_data_collector/domain/__init__.py b/src/vmk_data_collector/domain/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/domain/__init__.py diff --git a/src/vmk_data_collector/domain/entities.py b/src/vmk_data_collector/domain/entities.py new file mode 100644 index 0000000..c3477d7 --- /dev/null +++ b/src/vmk_data_collector/domain/entities.py @@ -0,0 +1,89 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class NormalizedProperty: + property_type: str | None = None + deal_type: str | None = None + title: str | None = None + description: str | None = None + price: float | None = None + currency: str | None = None + original_price: float | None = None + original_currency: str | None = None + price_per_sqm: float | None = None + total_area: float | None = None + living_area: float | None = None + kitchen_area: float | None = None + land_area: float | None = None + rooms_count: int | None = None + bedrooms_count: int | None = None + bathrooms_count: int | None = None + layout: str | None = None + floor: int | None = None + floors_total: int | None = None + building_year: int | None = None + building_type: str | None = None + renovation_status: str | None = None + ceiling_height: float | None = None + material: str | None = None + has_balcony: bool | None = None + has_loggia: bool | None = None + balcony_count: int | None = None + loggia_count: int | None = None + bathroom_type: str | None = None + elevator_count: int | None = None + has_freight_elevator: bool | None = None + parking_type: str | None = None + heating_type: str | None = None + internet: bool | None = None + security: bool | None = None + windows_direction: str | None = None + window_view: str | None = None + address_raw: str | None = None + city: str | None = None + district: str | None = None + micro_district: str | None = None + street: str | None = None + house_number: str | None = None + metro_station: str | None = None + metro_distance_min: int | None = None + metro_distance_type: str | None = None + latitude: float | None = None + longitude: float | None = None + contact_phone: str | None = None + contact_name: str | None = None + contact_email: str | None = None + is_agent: bool | None = None + agency_name: str | None = None + publish_date: str | None = None + url_source: str | None = None + images: list[str] = field(default_factory=list) + custom_fields: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class AiImageAnalysis: + overall_condition: str | None = None + rooms_observed: int | None = None + issues_found: list[str] = field(default_factory=list) + positive_highlights: list[str] = field(default_factory=list) + view_from_window: str | None = None + furniture_included: bool | None = None + appliances_included: list[str] = field(default_factory=list) + + +@dataclass +class AiEnrichmentResult: + extracted_features: dict[str, Any] = field(default_factory=dict) + price_assessment: dict[str, Any] = field(default_factory=dict) + listing_quality_score: int | None = None + reliability_rating: int | None = None + sentiment_score: float | None = None + classification: str | None = None + image_analysis_results: dict[str, Any] = field(default_factory=dict) + generated_description: str | None = None + summary: str | None = None + model_version: str | None = None + processing_time_ms: int | None = None diff --git a/src/vmk_data_collector/domain/enums.py b/src/vmk_data_collector/domain/enums.py new file mode 100644 index 0000000..f6722cf --- /dev/null +++ b/src/vmk_data_collector/domain/enums.py @@ -0,0 +1,105 @@ +from enum import StrEnum + + +class RawDataStatus(StrEnum): + pending = "pending" + processing = "processing" + completed = "completed" + failed = "failed" + invalid = "invalid" + + +class ValidationResult(StrEnum): + valid = "valid" + invalid = "invalid" + uncertain = "uncertain" + + +class DealType(StrEnum): + sale = "sale" + rent_long = "rent_long" + rent_short = "rent_short" + + +class ListingStatus(StrEnum): + active = "active" + sold = "sold" + rented = "rented" + removed = "removed" + archived = "archived" + + +class BuildingType(StrEnum): + brick = "brick" + panel = "panel" + monolith = "monolith" + gas_block = "gas_block" + wood = "wood" + + +class RenovationStatus(StrEnum): + cosmetic = "cosmetic" + euro = "euro" + designer = "designer" + none = "none" + construction = "construction" + + +class BathroomType(StrEnum): + combined = "combined" + separate = "separate" + multiple = "multiple" + + +class ParkingType(StrEnum): + ground = "ground" + underground = "underground" + none = "none" + garage = "garage" + + +class HeatingType(StrEnum): + central = "central" + autonomous = "autonomous" + floor = "floor" + none = "none" + + +class LayoutType(StrEnum): + studio = "studio" + separate = "separate" + adjacent = "adjacent" + + +class WindowView(StrEnum): + yard = "yard" + street = "street" + park = "park" + water = "water" + forest = "forest" + + +class MetroDistanceType(StrEnum): + walk = "walk" + transport = "transport" + + +class ImageDownloadStatus(StrEnum): + pending = "pending" + downloaded = "downloaded" + failed = "failed" + + +class ImageAnalysisStatus(StrEnum): + pending = "pending" + completed = "completed" + failed = "failed" + + +class CustomFieldType(StrEnum): + str = "str" + int = "int" + float = "float" + bool = "bool" + date = "date" + json = "json" diff --git a/src/vmk_data_collector/main.py b/src/vmk_data_collector/main.py new file mode 100644 index 0000000..be0da84 --- /dev/null +++ b/src/vmk_data_collector/main.py @@ -0,0 +1,65 @@ +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI +from fastapi.responses import JSONResponse + +from vmk_data_collector.api.v1.router_properties import ( + router as properties_router, +) +from vmk_data_collector.core.config import settings +from vmk_data_collector.core.exceptions import ( + AIProcessingError, + AppError, + NotRealEstateError, + ValidationError, +) +from vmk_data_collector.core.logging import configure_logging + + +@asynccontextmanager +async def lifespan(app: FastAPI): + configure_logging(settings.log_level, debug=settings.debug) + Path(settings.image_storage_path).mkdir(parents=True, exist_ok=True) + yield + + +app = FastAPI( + title="VMK Data Collector", + version="0.1.0", + lifespan=lifespan, +) + +app.include_router(properties_router, prefix="/api/v1") + + +@app.exception_handler(AppError) +async def app_error_handler(request, exc: AppError): + return JSONResponse( + status_code=500, + content={"detail": exc.message}, + ) + + +@app.exception_handler(ValidationError) +async def validation_error_handler(request, exc: ValidationError): + return JSONResponse( + status_code=422, + content={"detail": exc.message}, + ) + + +@app.exception_handler(NotRealEstateError) +async def not_real_estate_handler(request, exc: NotRealEstateError): + return JSONResponse( + status_code=202, + content={"status": "invalid", "reason": exc.message}, + ) + + +@app.exception_handler(AIProcessingError) +async def ai_processing_error_handler(request, exc: AIProcessingError): + return JSONResponse( + status_code=202, + content={"status": "failed", "reason": exc.message}, + ) diff --git a/src/vmk_data_collector/models/__init__.py b/src/vmk_data_collector/models/__init__.py new file mode 100644 index 0000000..4dab385 --- /dev/null +++ b/src/vmk_data_collector/models/__init__.py @@ -0,0 +1,19 @@ +from .ai_enrichment import AiEnrichment +from .data_source import DataSource +from .property_custom_field import PropertyCustomField +from .property_image import PropertyImage +from .property_listing import PropertyListing +from .property_snapshot import PropertySnapshot +from .property_type import PropertyType +from .raw_parsing_data import RawParsingData + +__all__ = [ + "AiEnrichment", + "DataSource", + "PropertyCustomField", + "PropertyImage", + "PropertyListing", + "PropertySnapshot", + "PropertyType", + "RawParsingData", +] diff --git a/src/vmk_data_collector/models/ai_enrichment.py b/src/vmk_data_collector/models/ai_enrichment.py new file mode 100644 index 0000000..51e742a --- /dev/null +++ b/src/vmk_data_collector/models/ai_enrichment.py @@ -0,0 +1,41 @@ +from datetime import datetime + +from sqlalchemy import ( + JSON, + TIMESTAMP, + ForeignKey, + Integer, + Numeric, + SmallInteger, + String, + Text, + func, +) +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base + + +class AiEnrichment(Base): + __tablename__ = "ai_enrichments" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column( + ForeignKey("property_listings.id", ondelete="CASCADE"), + nullable=False, + unique=True, + ) + extracted_features: Mapped[dict] = mapped_column(JSON, default=dict) + price_assessment: Mapped[dict] = mapped_column(JSON, default=dict) + listing_quality_score: Mapped[int | None] = mapped_column(SmallInteger) + reliability_rating: Mapped[int | None] = mapped_column(SmallInteger) + sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2)) + classification: Mapped[str | None] = mapped_column(String(64)) + image_analysis_results: Mapped[dict] = mapped_column(JSON, default=dict) + generated_description: Mapped[str | None] = mapped_column(Text) + summary: Mapped[str | None] = mapped_column(Text) + model_version: Mapped[str | None] = mapped_column(String(64)) + processing_time_ms: Mapped[int | None] = mapped_column(Integer) + created_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), server_default=func.now() + ) diff --git a/src/vmk_data_collector/models/data_source.py b/src/vmk_data_collector/models/data_source.py new file mode 100644 index 0000000..a350771 --- /dev/null +++ b/src/vmk_data_collector/models/data_source.py @@ -0,0 +1,19 @@ +from datetime import datetime + +from sqlalchemy import TIMESTAMP, Integer, String, Text, func +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base + + +class DataSource(Base): + __tablename__ = "data_sources" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + url_pattern: Mapped[str | None] = mapped_column(String(512)) + description: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), server_default=func.now() + ) diff --git a/src/vmk_data_collector/models/property_custom_field.py b/src/vmk_data_collector/models/property_custom_field.py new file mode 100644 index 0000000..6b327e6 --- /dev/null +++ b/src/vmk_data_collector/models/property_custom_field.py @@ -0,0 +1,19 @@ +from sqlalchemy import ForeignKey, Integer, String, Text, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base +from vmk_data_collector.domain.enums import CustomFieldType + + +class PropertyCustomField(Base): + __tablename__ = "property_custom_fields" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column( + ForeignKey("property_listings.id", ondelete="CASCADE"), nullable=False + ) + field_name: Mapped[str] = mapped_column(String(128), nullable=False) + field_value: Mapped[str] = mapped_column(Text) + field_type: Mapped[CustomFieldType] = mapped_column(default=CustomFieldType.str) + + __table_args__ = (UniqueConstraint("property_id", "field_name"),) diff --git a/src/vmk_data_collector/models/property_image.py b/src/vmk_data_collector/models/property_image.py new file mode 100644 index 0000000..22d8fa6 --- /dev/null +++ b/src/vmk_data_collector/models/property_image.py @@ -0,0 +1,28 @@ +from sqlalchemy import ForeignKey, Integer, SmallInteger, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base +from vmk_data_collector.domain.enums import ImageAnalysisStatus, ImageDownloadStatus + + +class PropertyImage(Base): + __tablename__ = "property_images" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column( + ForeignKey("property_listings.id", ondelete="CASCADE"), nullable=False + ) + url: Mapped[str] = mapped_column(Text, nullable=False) + local_path: Mapped[str | None] = mapped_column(String(512)) + hash: Mapped[str | None] = mapped_column(String(64)) + file_size: Mapped[int | None] = mapped_column(Integer) + width: Mapped[int | None] = mapped_column(SmallInteger) + height: Mapped[int | None] = mapped_column(SmallInteger) + download_status: Mapped[ImageDownloadStatus] = mapped_column( + default=ImageDownloadStatus.pending + ) + ai_description: Mapped[str | None] = mapped_column(Text) + analysis_status: Mapped[ImageAnalysisStatus] = mapped_column( + default=ImageAnalysisStatus.pending + ) + order_index: Mapped[int] = mapped_column(SmallInteger, default=0) diff --git a/src/vmk_data_collector/models/property_listing.py b/src/vmk_data_collector/models/property_listing.py new file mode 100644 index 0000000..60da4d4 --- /dev/null +++ b/src/vmk_data_collector/models/property_listing.py @@ -0,0 +1,133 @@ +from datetime import datetime + +from sqlalchemy import ( + TIMESTAMP, + Boolean, + ForeignKey, + Integer, + Numeric, + SmallInteger, + String, + Text, + UniqueConstraint, + func, +) +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base +from vmk_data_collector.domain.enums import ( + BathroomType, + BuildingType, + DealType, + HeatingType, + LayoutType, + ListingStatus, + MetroDistanceType, + ParkingType, + RenovationStatus, + WindowView, +) + + +class PropertyListing(Base): + __tablename__ = "property_listings" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + raw_data_id: Mapped[int | None] = mapped_column( + ForeignKey("raw_parsing_data.id"), unique=True + ) + source_id: Mapped[int | None] = mapped_column(ForeignKey("data_sources.id")) + external_id: Mapped[str | None] = mapped_column(String(255)) + + title: Mapped[str | None] = mapped_column(String(512)) + description: Mapped[str | None] = mapped_column(Text) + generated_description: Mapped[str | None] = mapped_column(Text) + + deal_type: Mapped[DealType | None] = mapped_column() + property_type_id: Mapped[int | None] = mapped_column( + ForeignKey("property_types.id") + ) + + price: Mapped[float | None] = mapped_column(Numeric(15, 2)) + currency: Mapped[str | None] = mapped_column(String(3)) + original_price: Mapped[float | None] = mapped_column(Numeric(15, 2)) + original_currency: Mapped[str | None] = mapped_column(String(3)) + price_per_sqm: Mapped[float | None] = mapped_column(Numeric(10, 2)) + + total_area: Mapped[float | None] = mapped_column(Numeric(8, 2)) + living_area: Mapped[float | None] = mapped_column(Numeric(8, 2)) + kitchen_area: Mapped[float | None] = mapped_column(Numeric(8, 2)) + land_area: Mapped[float | None] = mapped_column(Numeric(10, 2)) + + rooms_count: Mapped[int | None] = mapped_column(SmallInteger) + bedrooms_count: Mapped[int | None] = mapped_column(SmallInteger) + bathrooms_count: Mapped[int | None] = mapped_column(SmallInteger) + layout: Mapped[LayoutType | None] = mapped_column() + + floor: Mapped[int | None] = mapped_column(SmallInteger) + floors_total: Mapped[int | None] = mapped_column(SmallInteger) + building_year: Mapped[int | None] = mapped_column(SmallInteger) + + building_type: Mapped[BuildingType | None] = mapped_column() + renovation_status: Mapped[RenovationStatus | None] = mapped_column() + ceiling_height: Mapped[float | None] = mapped_column(Numeric(4, 2)) + material: Mapped[str | None] = mapped_column(String(128)) + + has_balcony: Mapped[bool | None] = mapped_column(Boolean) + has_loggia: Mapped[bool | None] = mapped_column(Boolean) + balcony_count: Mapped[int | None] = mapped_column(SmallInteger) + loggia_count: Mapped[int | None] = mapped_column(SmallInteger) + + bathroom_type: Mapped[BathroomType | None] = mapped_column() + elevator_count: Mapped[int | None] = mapped_column(SmallInteger) + has_freight_elevator: Mapped[bool | None] = mapped_column(Boolean) + + parking_type: Mapped[ParkingType | None] = mapped_column() + heating_type: Mapped[HeatingType | None] = mapped_column() + internet: Mapped[bool | None] = mapped_column(Boolean) + security: Mapped[bool | None] = mapped_column(Boolean) + + windows_direction: Mapped[str | None] = mapped_column(String(128)) + window_view: Mapped[WindowView | None] = mapped_column() + + address_raw: Mapped[str | None] = mapped_column(Text) + city: Mapped[str | None] = mapped_column(String(128)) + district: Mapped[str | None] = mapped_column(String(128)) + micro_district: Mapped[str | None] = mapped_column(String(128)) + street: Mapped[str | None] = mapped_column(String(128)) + house_number: Mapped[str | None] = mapped_column(String(32)) + + metro_station: Mapped[str | None] = mapped_column(String(128)) + metro_distance_min: Mapped[int | None] = mapped_column(SmallInteger) + metro_distance_type: Mapped[MetroDistanceType | None] = mapped_column() + + latitude: Mapped[float | None] = mapped_column(Numeric(10, 8)) + longitude: Mapped[float | None] = mapped_column(Numeric(11, 8)) + + contact_phone: Mapped[str | None] = mapped_column(String(64)) + contact_name: Mapped[str | None] = mapped_column(String(255)) + contact_email: Mapped[str | None] = mapped_column(String(255)) + is_agent: Mapped[bool | None] = mapped_column(Boolean) + agency_name: Mapped[str | None] = mapped_column(String(255)) + + publish_date: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True)) + url_source: Mapped[str | None] = mapped_column(Text) + listing_status: Mapped[ListingStatus | None] = mapped_column( + default=ListingStatus.active + ) + images_count: Mapped[int | None] = mapped_column(Integer) + + listing_quality_score: Mapped[int | None] = mapped_column(SmallInteger) + reliability_rating: Mapped[int | None] = mapped_column(SmallInteger) + sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2)) + + created_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), + server_default=func.now(), + onupdate=func.now(), + ) + + __table_args__ = (UniqueConstraint("source_id", "external_id"),) diff --git a/src/vmk_data_collector/models/property_snapshot.py b/src/vmk_data_collector/models/property_snapshot.py new file mode 100644 index 0000000..d6a86c1 --- /dev/null +++ b/src/vmk_data_collector/models/property_snapshot.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from sqlalchemy import JSON, TIMESTAMP, ForeignKey, Integer, func +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base + + +class PropertySnapshot(Base): + __tablename__ = "property_snapshots" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + property_id: Mapped[int] = mapped_column( + ForeignKey("property_listings.id", ondelete="CASCADE"), nullable=False + ) + snapshot_data: Mapped[dict] = mapped_column(JSON, default=dict) + changed_fields: Mapped[dict] = mapped_column(JSON, default=dict) + created_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), server_default=func.now() + ) diff --git a/src/vmk_data_collector/models/property_type.py b/src/vmk_data_collector/models/property_type.py new file mode 100644 index 0000000..0a9760f --- /dev/null +++ b/src/vmk_data_collector/models/property_type.py @@ -0,0 +1,13 @@ +from sqlalchemy import Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base + + +class PropertyType(Base): + __tablename__ = "property_types" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + name: Mapped[str] = mapped_column(String(128), nullable=False) + description: Mapped[str | None] = mapped_column(Text) diff --git a/src/vmk_data_collector/models/raw_parsing_data.py b/src/vmk_data_collector/models/raw_parsing_data.py new file mode 100644 index 0000000..36cd1d0 --- /dev/null +++ b/src/vmk_data_collector/models/raw_parsing_data.py @@ -0,0 +1,36 @@ +from datetime import datetime + +from sqlalchemy import ( + JSON, + TIMESTAMP, + ForeignKey, + Integer, + String, + Text, + UniqueConstraint, + func, +) +from sqlalchemy.orm import Mapped, mapped_column + +from vmk_data_collector.db.base import Base +from vmk_data_collector.domain.enums import RawDataStatus, ValidationResult + + +class RawParsingData(Base): + __tablename__ = "raw_parsing_data" + + id: Mapped[int] = mapped_column(Integer, primary_key=True) + source_id: Mapped[int | None] = mapped_column(ForeignKey("data_sources.id")) + external_id: Mapped[str | None] = mapped_column(String(255)) + payload: Mapped[dict] = mapped_column(JSON, default=dict) + status: Mapped[RawDataStatus] = mapped_column( + default=RawDataStatus.pending + ) + validation_result: Mapped[ValidationResult | None] = mapped_column() + error_message: Mapped[str | None] = mapped_column(Text) + received_at: Mapped[datetime] = mapped_column( + TIMESTAMP(timezone=True), server_default=func.now() + ) + processed_at: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True)) + + __table_args__ = (UniqueConstraint("source_id", "external_id"),) diff --git a/src/vmk_data_collector/schemas/__init__.py b/src/vmk_data_collector/schemas/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/schemas/__init__.py diff --git a/src/vmk_data_collector/schemas/ai_response.py b/src/vmk_data_collector/schemas/ai_response.py new file mode 100644 index 0000000..48d3eb6 --- /dev/null +++ b/src/vmk_data_collector/schemas/ai_response.py @@ -0,0 +1,33 @@ +from typing import Any + +from pydantic import BaseModel + + +class AiNormalizerResponse(BaseModel): + is_real_estate: bool + reason: str | None = None + normalized: dict[str, Any] | None = None + + +class AiImageAnalysisResponse(BaseModel): + overall_condition: str | None = None + rooms_observed: int | None = None + issues_found: list[str] = [] + positive_highlights: list[str] = [] + view_from_window: str | None = None + furniture_included: bool | None = None + appliances_included: list[str] = [] + + +class AiEnrichmentResponse(BaseModel): + extracted_features: dict[str, Any] = {} + price_assessment: dict[str, Any] = {} + listing_quality_score: int | None = None + reliability_rating: int | None = None + sentiment_score: float | None = None + classification: str | None = None + image_analysis_results: dict[str, Any] = {} + generated_description: str | None = None + summary: str | None = None + model_version: str | None = None + processing_time_ms: int | None = None diff --git a/src/vmk_data_collector/schemas/normalized.py b/src/vmk_data_collector/schemas/normalized.py new file mode 100644 index 0000000..91a74eb --- /dev/null +++ b/src/vmk_data_collector/schemas/normalized.py @@ -0,0 +1,63 @@ +from typing import Any + +from pydantic import BaseModel + + +class NormalizedPropertySchema(BaseModel): + property_type: str | None = None + deal_type: str | None = None + title: str | None = None + description: str | None = None + price: float | None = None + currency: str | None = None + original_price: float | None = None + original_currency: str | None = None + price_per_sqm: float | None = None + total_area: float | None = None + living_area: float | None = None + kitchen_area: float | None = None + land_area: float | None = None + rooms_count: int | None = None + bedrooms_count: int | None = None + bathrooms_count: int | None = None + layout: str | None = None + floor: int | None = None + floors_total: int | None = None + building_year: int | None = None + building_type: str | None = None + renovation_status: str | None = None + ceiling_height: float | None = None + material: str | None = None + has_balcony: bool | None = None + has_loggia: bool | None = None + balcony_count: int | None = None + loggia_count: int | None = None + bathroom_type: str | None = None + elevator_count: int | None = None + has_freight_elevator: bool | None = None + parking_type: str | None = None + heating_type: str | None = None + internet: bool | None = None + security: bool | None = None + windows_direction: str | None = None + window_view: str | None = None + address_raw: str | None = None + city: str | None = None + district: str | None = None + micro_district: str | None = None + street: str | None = None + house_number: str | None = None + metro_station: str | None = None + metro_distance_min: int | None = None + metro_distance_type: str | None = None + latitude: float | None = None + longitude: float | None = None + contact_phone: str | None = None + contact_name: str | None = None + contact_email: str | None = None + is_agent: bool | None = None + agency_name: str | None = None + publish_date: str | None = None + url_source: str | None = None + images: list[str] = [] + custom_fields: dict[str, Any] = {} diff --git a/src/vmk_data_collector/schemas/raw_data.py b/src/vmk_data_collector/schemas/raw_data.py new file mode 100644 index 0000000..42877bb --- /dev/null +++ b/src/vmk_data_collector/schemas/raw_data.py @@ -0,0 +1,18 @@ +from typing import Any + +from pydantic import BaseModel + + +class RawDataIngestRequest(BaseModel): + source_slug: str + external_id: str + payload: dict[str, Any] + + +class IngestResponse(BaseModel): + job_id: int + property_id: int | None = None + status: str + reason: str | None = None + message: str + snapshot_id: int | None = None diff --git a/src/vmk_data_collector/services/__init__.py b/src/vmk_data_collector/services/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/vmk_data_collector/services/__init__.py diff --git a/src/vmk_data_collector/services/ai_enricher.py b/src/vmk_data_collector/services/ai_enricher.py new file mode 100644 index 0000000..e059315 --- /dev/null +++ b/src/vmk_data_collector/services/ai_enricher.py @@ -0,0 +1,107 @@ +import json +from typing import Any + +import structlog + +from vmk_data_collector.core.config import settings +from vmk_data_collector.domain.entities import AiEnrichmentResult, NormalizedProperty +from vmk_data_collector.services.ollama_client import OllamaClient + +logger = structlog.get_logger() + +_SYSTEM_PROMPT = """Ты — эксперт по оценке объявлений о недвижимости. +Проанализируй объявление и верни ТОЛЬКО JSON: +{ + "extracted_features": {"ключевая_особенность": "значение"}, + "price_assessment": { + "estimated_market_price": 150000, + "price_reasonableness": "ниже рынка/на уровне рынка/выше рынка", + "currency": "UAH" + }, + "listing_quality_score": 7, + "reliability_rating": 4, + "sentiment_score": 0.5, + "classification": "жилая_недвижимость", + "image_analysis_results": {"общее_впечатление": "хорошее"}, + "generated_description": "Краткое привлекательное описание для покупателя...", + "summary": "Краткая сводка: что за объект, цена, состояние, плюсы/минусы.", + "model_version": "llama3.2", + "processing_time_ms": 1200 +} +Оценка качества объявления (listing_quality_score): 1–10. +Надёжность (reliability_rating): 1–5. +Sentiment (-1 до 1).""" + +_MOCK_RESPONSE: dict[str, Any] = { + "extracted_features": {"area": "50 м²", "rooms": "2"}, + "price_assessment": { + "estimated_market_price": 120000, + "price_reasonableness": "на уровне рынка", + "currency": "UAH", + }, + "listing_quality_score": 6, + "reliability_rating": 3, + "sentiment_score": 0.2, + "classification": "жилая_недвижимость", + "image_analysis_results": {}, + "generated_description": "Уютная двухкомнатная квартира в центре города.", + "summary": "Квартира 50 м², 2 комнаты, цена адекватна.", + "model_version": "llama3.2-mock", + "processing_time_ms": 0, +} + + +class AiEnricher: + def __init__(self, client: OllamaClient) -> None: + self._client = client + + async def enrich( + self, + normalized: NormalizedProperty, + image_analysis_results: dict[str, Any], + ) -> AiEnrichmentResult: + if settings.ollama_mock: + logger.info("ai_enricher_mock_mode") + return AiEnrichmentResult(**_MOCK_RESPONSE) + + text = self._build_prompt(normalized, image_analysis_results) + messages = [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": text}, + ] + + try: + response = await self._client.chat( + model=settings.ollama_text_model, + messages=messages, + json_mode=True, + ) + content = response["message"]["content"] + data = json.loads(content) + return AiEnrichmentResult(**data) + except Exception as exc: + logger.error("ai_enricher_error", error=str(exc)) + return AiEnrichmentResult() + + @staticmethod + def _build_prompt( + normalized: NormalizedProperty, + image_analysis_results: dict[str, Any], + ) -> str: + lines = [ + f"Заголовок: {normalized.title or '—'}", + f"Описание: {normalized.description or '—'}", + f"Тип: {normalized.property_type or '—'}", + f"Сделка: {normalized.deal_type or '—'}", + f"Цена: {normalized.price or '—'} {normalized.currency or ''}", + f"Площадь: {normalized.total_area or '—'} м²", + f"Комнат: {normalized.rooms_count or '—'}", + f"Этаж: {normalized.floor or '—'} / {normalized.floors_total or '—'}", + f"Адрес: {normalized.address_raw or '—'}", + f"Город: {normalized.city or '—'}", + ] + if image_analysis_results: + lines.append( + f"Анализ фото: {json.dumps(image_analysis_results, ensure_ascii=False)}" + ) + return "\n".join(lines) diff --git a/src/vmk_data_collector/services/ai_image_analyzer.py b/src/vmk_data_collector/services/ai_image_analyzer.py new file mode 100644 index 0000000..e7f7c3b --- /dev/null +++ b/src/vmk_data_collector/services/ai_image_analyzer.py @@ -0,0 +1,60 @@ +import json + +import structlog + +from vmk_data_collector.core.config import settings +from vmk_data_collector.schemas.ai_response import AiImageAnalysisResponse +from vmk_data_collector.services.ollama_client import OllamaClient + +logger = structlog.get_logger() + +_SYSTEM_PROMPT = """Ты — анализатор фотографий объектов недвижимости. +Опиши состояние и содержимое фото. Ответь ТОЛЬКО JSON с полями: +{ + "overall_condition": "отличное/хорошее/среднее/требует ремонта/плохое", + "rooms_observed": 2, + "issues_found": ["трещина на стене", "плесень"], + "positive_highlights": ["новая мебель", "хороший вид"], + "view_from_window": "двор/улица/парк/вода/лес", + "furniture_included": true, + "appliances_included": ["холодильник", "стиральная машина"] +} +Если фото некорректное или не относится к недвижимости, верни пустые значения.""" + +_MOCK_RESPONSE = { + "overall_condition": "хорошее", + "rooms_observed": 2, + "issues_found": [], + "positive_highlights": ["светлая комната"], + "view_from_window": "двор", + "furniture_included": True, + "appliances_included": [], +} + + +class AiImageAnalyzer: + def __init__(self, client: OllamaClient) -> None: + self._client = client + + async def analyze(self, image_base64: str) -> AiImageAnalysisResponse: + if settings.ollama_mock: + logger.info("ai_image_analyzer_mock_mode") + return AiImageAnalysisResponse(**_MOCK_RESPONSE) + + messages = [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": "Опиши объект на фото."}, + ] + + try: + response = await self._client.chat_with_images( + model=settings.ollama_vision_model, + messages=messages, + images_base64=[image_base64], + ) + content = response["message"]["content"] + data = json.loads(content) + return AiImageAnalysisResponse(**data) + except Exception as exc: + logger.error("ai_image_analyzer_error", error=str(exc)) + return AiImageAnalysisResponse() diff --git a/src/vmk_data_collector/services/ai_normalizer.py b/src/vmk_data_collector/services/ai_normalizer.py new file mode 100644 index 0000000..5004edb --- /dev/null +++ b/src/vmk_data_collector/services/ai_normalizer.py @@ -0,0 +1,157 @@ +import json +from typing import Any + +import structlog + +from vmk_data_collector.core.config import settings +from vmk_data_collector.schemas.ai_response import AiNormalizerResponse +from vmk_data_collector.services.ollama_client import OllamaClient + +logger = structlog.get_logger() + +_SYSTEM_PROMPT = """Ты — анализатор объявлений о недвижимости. +Определи, является ли текст объявлением о недвижимости +(квартира, дом, земля, гараж, офис, склад и т.д.). +Если НЕТ — верни ТОЛЬКО JSON: {"is_real_estate": false, "reason": "Краткая причина"}. +Если ДА — верни ТОЛЬКО JSON вида: +{ + "is_real_estate": true, + "reason": null, + "normalized": { + "property_type": ( + "apartment/house/land/garage/office/" + "warehouse/retail/cottage/room/" + "new_building/commercial/townhouse" + ), + "deal_type": "sale/rent_long/rent_short", + "title": "...", + "description": "...", + "price": 12345.67, + "currency": "UAH", + "original_price": null, + "original_currency": null, + "price_per_sqm": null, + "total_area": 45.5, + "living_area": null, + "kitchen_area": null, + "land_area": null, + "rooms_count": 2, + "bedrooms_count": null, + "bathrooms_count": null, + "layout": null, + "floor": 3, + "floors_total": 9, + "building_year": null, + "building_type": null, + "renovation_status": null, + "ceiling_height": null, + "material": null, + "has_balcony": null, + "has_loggia": null, + "balcony_count": null, + "loggia_count": null, + "bathroom_type": null, + "elevator_count": null, + "has_freight_elevator": null, + "parking_type": null, + "heating_type": null, + "internet": null, + "security": null, + "windows_direction": null, + "window_view": null, + "address_raw": "...", + "city": "...", + "district": null, + "micro_district": null, + "street": null, + "house_number": null, + "metro_station": null, + "metro_distance_min": null, + "metro_distance_type": null, + "latitude": null, + "longitude": null, + "contact_phone": null, + "contact_name": null, + "contact_email": null, + "is_agent": null, + "agency_name": null, + "publish_date": null, + "url_source": null, + "images": ["url1", "url2"], + "custom_fields": {} + } +} +Не добавляй ничего кроме JSON.""" + +_MOCK_RESPONSE = { + "is_real_estate": True, + "reason": None, + "normalized": { + "property_type": "apartment", + "deal_type": "sale", + "title": "Mock Title", + "description": "Mock description.", + "price": 100000, + "currency": "UAH", + "total_area": 50.0, + "rooms_count": 2, + "floor": 3, + "floors_total": 9, + "city": "Kyiv", + "address_raw": "Kyiv, Khreshchatyk", + "images": [], + "custom_fields": {}, + }, +} + + +class AiNormalizer: + def __init__(self, client: OllamaClient) -> None: + self._client = client + + async def normalize(self, payload: dict[str, Any]) -> AiNormalizerResponse: + if settings.ollama_mock: + logger.info("ai_normalizer_mock_mode") + return AiNormalizerResponse(**_MOCK_RESPONSE) + + text = self._build_text(payload) + messages = [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": text}, + ] + + try: + response = await self._client.chat( + model=settings.ollama_text_model, + messages=messages, + json_mode=True, + ) + content = response["message"]["content"] + data = json.loads(content) + return AiNormalizerResponse(**data) + except Exception as exc: + logger.error("ai_normalizer_error", error=str(exc)) + return AiNormalizerResponse( + is_real_estate=False, + reason=f"AI error: {exc}", + ) + + @staticmethod + def _build_text(payload: dict[str, Any]) -> str: + parts: list[str] = [] + title = payload.get("title") + if title: + parts.append(f"Заголовок: {title}") + description = payload.get("description") + if description: + parts.append(f"Описание: {description}") + price = payload.get("price") + if price: + parts.append(f"Цена: {price}") + url = payload.get("url") + if url: + parts.append(f"URL: {url}") + for key, value in payload.items(): + if key not in ("title", "description", "price", "url", "images"): + parts.append(f"{key}: {value}") + return "\n".join(parts) diff --git a/src/vmk_data_collector/services/image_downloader.py b/src/vmk_data_collector/services/image_downloader.py new file mode 100644 index 0000000..ebb642f --- /dev/null +++ b/src/vmk_data_collector/services/image_downloader.py @@ -0,0 +1,92 @@ +import hashlib +from dataclasses import dataclass +from pathlib import Path + +import httpx +import structlog +from PIL import Image + +logger = structlog.get_logger() + + +@dataclass +class PropertyImageDownloadResult: + local_path: str + image_hash: str + width: int + height: int + file_size: int + + +class ImageDownloader: + def __init__(self, storage_path: Path) -> None: + self._storage_path = storage_path + + async def download( + self, + property_id: int, + image_url: str, + order_index: int, + ) -> PropertyImageDownloadResult: + logger.info( + "image_download_start", + property_id=property_id, + url=image_url, + order=order_index, + ) + + async with httpx.AsyncClient(timeout=30) as client: + response = await client.get(image_url) + response.raise_for_status() + content = response.content + + image_hash = hashlib.sha256(content).hexdigest() + ext = self._detect_extension( + response.headers.get("content-type", ""), image_url + ) + + property_dir = self._storage_path / str(property_id) + property_dir.mkdir(parents=True, exist_ok=True) + + local_path = property_dir / f"{image_hash}.{ext}" + local_path.write_bytes(content) + + with Image.open(local_path) as img: + width, height = img.size + + file_size = len(content) + + logger.info( + "image_download_complete", + property_id=property_id, + hash=image_hash, + width=width, + height=height, + size=file_size, + ) + + return PropertyImageDownloadResult( + local_path=str(local_path), + image_hash=image_hash, + width=width, + height=height, + file_size=file_size, + ) + + @staticmethod + def _detect_extension(content_type: str, url: str) -> str: + ct = content_type.lower() + if "jpeg" in ct or "jpg" in ct: + return "jpg" + if "png" in ct: + return "png" + if "webp" in ct: + return "webp" + if "gif" in ct: + return "gif" + + url_lower = url.lower() + for ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"): + if url_lower.endswith(ext): + return ext.lstrip(".") + return "jpg" diff --git a/src/vmk_data_collector/services/ollama_client.py b/src/vmk_data_collector/services/ollama_client.py new file mode 100644 index 0000000..a86fcd6 --- /dev/null +++ b/src/vmk_data_collector/services/ollama_client.py @@ -0,0 +1,74 @@ +import base64 +from pathlib import Path +from typing import Any + +import httpx +import structlog + +logger = structlog.get_logger() + + +class OllamaClient: + def __init__(self, base_url: str, timeout: int = 120) -> None: + self._client = httpx.AsyncClient( + base_url=base_url, + timeout=timeout, + ) + + async def chat( + self, + model: str, + messages: list[dict[str, Any]], + json_mode: bool = False, + ) -> dict[str, Any]: + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "stream": False, + } + if json_mode: + payload["format"] = "json" + + logger.info( + "ollama_chat_request", + model=model, + json_mode=json_mode, + message_count=len(messages), + ) + response = await self._client.post("/api/chat", json=payload) + response.raise_for_status() + data = response.json() + logger.info("ollama_chat_response", model=model) + return data + + async def chat_with_images( + self, + model: str, + messages: list[dict[str, Any]], + images_base64: list[str], + ) -> dict[str, Any]: + if messages and images_base64: + messages[-1]["images"] = images_base64 + + logger.info( + "ollama_vision_request", + model=model, + image_count=len(images_base64), + ) + response = await self._client.post("/api/chat", json={ + "model": model, + "messages": messages, + "stream": False, + }) + response.raise_for_status() + data = response.json() + logger.info("ollama_vision_response", model=model) + return data + + async def close(self) -> None: + await self._client.aclose() + + @staticmethod + def image_to_base64(image_path: str) -> str: + with Path(image_path).open("rb") as f: + return base64.b64encode(f.read()).decode("utf-8") diff --git a/src/vmk_data_collector/services/property_pipeline.py b/src/vmk_data_collector/services/property_pipeline.py new file mode 100644 index 0000000..30c20f6 --- /dev/null +++ b/src/vmk_data_collector/services/property_pipeline.py @@ -0,0 +1,338 @@ +from typing import Any + +import structlog +from sqlalchemy import inspect + +from vmk_data_collector.db.repositories.ai_enrichment import ( + AiEnrichmentRepository, +) +from vmk_data_collector.db.repositories.custom_field import ( + CustomFieldRepository, +) +from vmk_data_collector.db.repositories.data_source import ( + DataSourceRepository, +) +from vmk_data_collector.db.repositories.image import ImageRepository +from vmk_data_collector.db.repositories.property import PropertyRepository +from vmk_data_collector.db.repositories.property_type import ( + PropertyTypeRepository, +) +from vmk_data_collector.db.repositories.raw_data import RawDataRepository +from vmk_data_collector.db.repositories.snapshot import SnapshotRepository +from vmk_data_collector.domain.entities import NormalizedProperty +from vmk_data_collector.domain.enums import RawDataStatus +from vmk_data_collector.schemas.raw_data import IngestResponse +from vmk_data_collector.services.ai_enricher import AiEnricher +from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer +from vmk_data_collector.services.ai_normalizer import AiNormalizer +from vmk_data_collector.services.image_downloader import ImageDownloader +from vmk_data_collector.services.ollama_client import OllamaClient + +logger = structlog.get_logger() + + +class PropertyPipeline: + def __init__( + self, + raw_repo: RawDataRepository, + property_repo: PropertyRepository, + image_repo: ImageRepository, + custom_field_repo: CustomFieldRepository, + snapshot_repo: SnapshotRepository, + enrichment_repo: AiEnrichmentRepository, + data_source_repo: DataSourceRepository, + property_type_repo: PropertyTypeRepository, + normalizer: AiNormalizer, + image_downloader: ImageDownloader, + image_analyzer: AiImageAnalyzer, + enricher: AiEnricher, + ) -> None: + self._raw_repo = raw_repo + self._property_repo = property_repo + self._image_repo = image_repo + self._custom_field_repo = custom_field_repo + self._snapshot_repo = snapshot_repo + self._enrichment_repo = enrichment_repo + self._data_source_repo = data_source_repo + self._property_type_repo = property_type_repo + self._normalizer = normalizer + self._image_downloader = image_downloader + self._image_analyzer = image_analyzer + self._enricher = enricher + + async def process(self, raw_data_id: int) -> IngestResponse: + raw = await self._raw_repo.get_by_id(raw_data_id) + if raw is None: + raise ValueError(f"Raw data {raw_data_id} not found") + + await self._raw_repo.update_status( + raw_data_id, RawDataStatus.processing + ) + + try: + norm_response = await self._normalizer.normalize(raw.payload) + except Exception as exc: + logger.error( + "pipeline_normalizer_error", + raw_id=raw_data_id, + error=str(exc), + ) + await self._raw_repo.update_status( + raw_data_id, + RawDataStatus.failed, + error_message=str(exc), + ) + return IngestResponse( + job_id=raw_data_id, + status="failed", + reason="normalizer_error", + message=f"AI normalizer failed: {exc}", + ) + + if not norm_response.is_real_estate: + await self._raw_repo.update_status( + raw_data_id, + RawDataStatus.invalid, + error_message=norm_response.reason, + ) + return IngestResponse( + job_id=raw_data_id, + status="invalid", + reason=norm_response.reason, + message="Payload is not real estate", + ) + + normalized = NormalizedProperty( + **(norm_response.normalized or {}) + ) + source_slug = raw.payload.get("source_slug", "unknown") + data_source = await self._data_source_repo.get_or_create_by_slug( + source_slug, name=source_slug + ) + + existing = await self._property_repo.get_by_source_and_external( + data_source.id, raw.external_id + ) + + listing_kwargs = self._normalized_to_kwargs(normalized) + if normalized.property_type: + property_type = await self._property_type_repo.get_or_create_by_slug( + normalized.property_type, + name=normalized.property_type, + ) + listing_kwargs["property_type_id"] = property_type.id + listing_kwargs.pop("property_type", None) + listing_kwargs["source_id"] = data_source.id + listing_kwargs["external_id"] = raw.external_id + listing_kwargs["raw_data_id"] = raw_data_id + + snapshot_id = None + if existing: + snapshot_data = self._listing_to_dict(existing) + changed_fields = self._compute_changed_fields( + snapshot_data, listing_kwargs + ) + snapshot = await self._snapshot_repo.create( + existing.id, snapshot_data, changed_fields + ) + snapshot_id = snapshot.id + await self._property_repo.update( + existing.id, **listing_kwargs + ) + property_id = existing.id + await self._custom_field_repo.delete_by_property( + property_id + ) + else: + property_obj = await self._property_repo.create( + **listing_kwargs + ) + property_id = property_obj.id + + if normalized.custom_fields: + fields = [ + { + "field_name": k, + "field_value": str(v), + "field_type": self._infer_field_type(v), + } + for k, v in normalized.custom_fields.items() + ] + await self._custom_field_repo.bulk_create(property_id, fields) + + aggregated_analysis: dict[str, Any] = {} + for order, url in enumerate(normalized.images): + try: + result = await self._image_downloader.download( + property_id, url, order + ) + except Exception as exc: + logger.error( + "image_download_failed", + property_id=property_id, + url=url, + error=str(exc), + ) + continue + + duplicate = await self._image_repo.get_by_hash( + property_id, result.image_hash + ) + if duplicate: + logger.info( + "image_duplicate_skipped", + property_id=property_id, + hash=result.image_hash, + ) + continue + + image = await self._image_repo.create( + property_id, url, order + ) + await self._image_repo.update_downloaded( + image.id, + result.local_path, + result.file_size, + result.width, + result.height, + result.image_hash, + ) + + if result.local_path: + try: + b64 = OllamaClient.image_to_base64( + result.local_path + ) + analysis = await self._image_analyzer.analyze( + b64 + ) + await self._image_repo.update_analysis( + image.id, + analysis.overall_condition or "", + ) + aggregated_analysis[url] = ( + analysis.model_dump() + ) + except Exception as exc: + logger.error( + "image_analysis_failed", + property_id=property_id, + image_id=image.id, + error=str(exc), + ) + + try: + enrichment = await self._enricher.enrich( + normalized, aggregated_analysis + ) + except Exception as exc: + logger.error( + "pipeline_enricher_error", + property_id=property_id, + error=str(exc), + ) + enrichment = None + + if enrichment: + await self._enrichment_repo.delete_by_property( + property_id + ) + await self._enrichment_repo.create( + property_id, + extracted_features=enrichment.extracted_features, + price_assessment=enrichment.price_assessment, + listing_quality_score=( + enrichment.listing_quality_score + ), + reliability_rating=enrichment.reliability_rating, + sentiment_score=enrichment.sentiment_score, + classification=enrichment.classification, + image_analysis_results=aggregated_analysis, + generated_description=( + enrichment.generated_description + ), + summary=enrichment.summary, + model_version=enrichment.model_version, + processing_time_ms=( + enrichment.processing_time_ms + ), + ) + await self._property_repo.update( + property_id, + listing_quality_score=( + enrichment.listing_quality_score + ), + reliability_rating=enrichment.reliability_rating, + sentiment_score=enrichment.sentiment_score, + generated_description=( + enrichment.generated_description + ), + ) + + await self._raw_repo.set_processed(raw_data_id) + + return IngestResponse( + job_id=raw_data_id, + property_id=property_id, + status="completed", + message="Property ingested successfully", + snapshot_id=snapshot_id, + ) + + @staticmethod + def _normalized_to_kwargs( + normalized: NormalizedProperty, + ) -> dict[str, Any]: + return { + k: v + for k, v in normalized.__dict__.items() + if k not in ("images", "custom_fields") + } + + @staticmethod + def _listing_to_dict(listing: Any) -> dict[str, Any]: + mapper = inspect(listing).mapper + return { + col.key: PropertyPipeline._serialize_value( + getattr(listing, col.key) + ) + for col in mapper.column_attrs + } + + @staticmethod + def _serialize_value(value: Any) -> Any: + from datetime import datetime + from decimal import Decimal + from enum import Enum + + if value is None: + return None + if isinstance(value, Decimal): + return float(value) + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, Enum): + return value.value + return value + + @staticmethod + def _compute_changed_fields( + old: dict[str, Any], + new: dict[str, Any], + ) -> dict[str, Any]: + changed: dict[str, Any] = {} + for key, new_val in new.items(): + old_val = old.get(key) + if old_val != new_val: + changed[key] = {"old": old_val, "new": new_val} + return changed + + @staticmethod + def _infer_field_type(value: Any) -> str: + if isinstance(value, bool): + return "bool" + if isinstance(value, int): + return "int" + if isinstance(value, float): + return "float" + return "str"