Newer
Older
vmk-360-data_collector / docs / IMPLEMENTATION_PLAN.md
@Eugene Sukhodolskiy Eugene Sukhodolskiy 1 day ago 40 KB feat: core pipeline + FastAPI API (Phases 0-6)

VMK 360 Data Collector — Детальный план реализации

Плейбук поэтапной разработки. Каждая фаза содержит конкретные файлы, классы и функции.


Фаза 0: Подготовка окружения (до кода)

Шаг 0.1: Инициализация git-репозитория

  • git init (уже есть)
  • Создать .gitignore:
    .env
    __pycache__/
    *.pyc
    .pytest_cache/
    .mypy_cache/
    *.egg-info/
    dist/
    build/
    alembic/versions/*.pyc
    .coverage
    htmlcov/
    /var/lib/vmk/images/

Шаг 0.2: Запуск PostgreSQL

  • docker compose up -d postgres
  • Проверить: docker logs vmk_postgresdatabase system is ready

Шаг 0.3: Создание директорий

mkdir -p src/vmk_data_collector/{core,api/v1,domain,schemas,services,db/repositories,models}
mkdir -p tests/{unit,integration}
mkdir -p alembic/versions
mkdir -p docs

Фаза 1: Фундамент (зависимости, конфиг, логи)

Шаг 1.1: pyproject.toml

Создать /home/gmikcon/Projects/vmk/data_collector/pyproject.toml:

  • build-system: setuptools
  • dependencies: fastapi, uvicorn, sqlalchemy[asyncio], asyncpg, alembic, pydantic, pydantic-settings, httpx, structlog, python-dotenv, tenacity, pillow
  • dev: pytest, pytest-asyncio, httpx, factory-boy, faker, ruff, black, mypy, coverage
  • tool.pytest.ini_options: asyncio_mode=auto, markers: unit/integration/slow
  • tool.black: line-length 88, target py312
  • tool.ruff: target py312, select E,F,I,N,W,UP,B,C4,SIM
  • tool.mypy: python_version=3.12, disallow_untyped_defs=true

Шаг 1.2: .env.example

Создать /home/gmikcon/Projects/vmk/data_collector/.env.example:

DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10
DATABASE_ECHO=false

APP_HOST=0.0.0.0
APP_PORT=8000
LOG_LEVEL=info
DEBUG=false

OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_TEXT_MODEL=llama3.2
OLLAMA_VISION_MODEL=llava
OLLAMA_TIMEOUT=120
OLLAMA_MOCK=false

IMAGE_STORAGE_PATH=/var/lib/vmk/images

ENABLE_IMAGE_ANALYSIS=true
ENABLE_PRICE_ESTIMATION=true

Шаг 1.3: docker-compose.yml

Создать /home/gmikcon/Projects/vmk/data_collector/docker-compose.yml:

  • Сервис postgres: image postgres:16-alpine, env POSTGRES_USER/POSTGRES_PASSWORD/POSTGRES_DB, порт 5432, volume vmk_postgres_data
  • healthcheck pg_isready
  • volume vmk_postgres_data

Шаг 1.4: Установка зависимостей

python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

Шаг 1.5: Конфигурация (src/vmk_data_collector/core/config.py)

Создать Settings(BaseSettings):

  • model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
  • Поля: database_url: str, database_pool_size: int=20, database_max_overflow: int=10, database_echo: bool=False
  • Поля: app_host: str, app_port: int, log_level: str, debug: bool=False
  • Поля: ollama_base_url: str, ollama_text_model: str, ollama_vision_model: str, ollama_timeout: int=120, ollama_mock: bool=False
  • Поля: image_storage_path: str, enable_image_analysis: bool=True, enable_price_estimation: bool=True
  • @property def database_url_async(self) -> str — возвращает URL с +asyncpg
  • @property def image_storage_path_abs(self) -> PathPath(self.image_storage_path).resolve()

Шаг 1.6: Логирование (src/vmk_data_collector/core/logging.py)

Создать функцию configure_logging(log_level: str) -> None:

  • structlog.configure с процессорами: filter_by_level, add_logger_name, add_log_level, format_exc_info, TimeStamper, JSONRenderer
  • Если debug=TrueConsoleRenderer(colors=True)

Шаг 1.7: Исключения (src/vmk_data_collector/core/exceptions.py)

Создать иерархию:

  • class AppException(Exception): ...
  • class ValidationError(AppException): ...
  • class AIProcessingError(AppException): ...
  • class NotRealEstateError(ValidationError): ... — специфичный reject
  • class DatabaseError(AppException): ...

Definition of Done фазы 1

  • pip install -e ".[dev]" проходит без ошибок
  • python -c "from vmk_data_collector.core.config import Settings; print(Settings())" читает .env
  • docker compose up -d запускает PostgreSQL
  • ruff check src/ проходит без ошибок

Фаза 2: Модели БД и миграции Alembic

Шаг 2.1: Инициализация Alembic

alembic init alembic
  • Настроить alembic.ini: sqlalchemy.url = postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data
  • Настроить alembic/env.py:
    • Импорт Base из vmk_data_collector.db.base
    • target_metadata = Base.metadata
    • run_migrations_online() с AsyncEngine из create_async_engine

Шаг 2.2: База (src/vmk_data_collector/db/base.py)

Создать:

from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
    pass

Шаг 2.3: Enum-классы (src/vmk_data_collector/domain/enums.py)

Создать:

  • class RawDataStatus(str, Enum): pending, processing, completed, failed, invalid
  • class ValidationResult(str, Enum): valid, invalid, uncertain
  • class DealType(str, Enum): sale, rent_long, rent_short
  • class ListingStatus(str, Enum): active, sold, rented, removed, archived
  • class BuildingType(str, Enum): brick, panel, monolith, gas_block, wood
  • class RenovationStatus(str, Enum): cosmetic, euro, designer, none, construction
  • class BathroomType(str, Enum): combined, separate, multiple
  • class ParkingType(str, Enum): ground, underground, none, garage
  • class HeatingType(str, Enum): central, autonomous, floor, none
  • class LayoutType(str, Enum): studio, separate, adjacent
  • class WindowView(str, Enum): yard, street, park, water, forest
  • class MetroDistanceType(str, Enum): walk, transport
  • class ImageDownloadStatus(str, Enum): pending, downloaded, failed
  • class ImageAnalysisStatus(str, Enum): pending, completed, failed
  • class CustomFieldType(str, Enum): str, int, float, bool, date, json

Шаг 2.4: ORM-модели

src/vmk_data_collector/models/data_source.py

class DataSource(Base):
    __tablename__ = "data_sources"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    url_pattern: Mapped[str | None] = mapped_column(String(512))
    description: Mapped[str | None] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())

src/vmk_data_collector/models/property_type.py

class PropertyType(Base):
    __tablename__ = "property_types"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
    name: Mapped[str] = mapped_column(String(128), nullable=False)
    description: Mapped[str | None] = mapped_column(Text)

src/vmk_data_collector/models/raw_parsing_data.py

class RawParsingData(Base):
    __tablename__ = "raw_parsing_data"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    source_id: Mapped[int | None] = mapped_column(ForeignKey("data_sources.id"))
    external_id: Mapped[str | None] = mapped_column(String(255))
    payload: Mapped[dict] = mapped_column(JSONB, default={})
    status: Mapped[RawDataStatus] = mapped_column(Enum(RawDataStatus, name="raw_data_status"), default=RawDataStatus.pending)
    validation_result: Mapped[ValidationResult | None] = mapped_column(Enum(ValidationResult, name="validation_result"))
    error_message: Mapped[str | None] = mapped_column(Text)
    received_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
    processed_at: Mapped[datetime | None] = mapped_column(TIMESTAMP(timezone=True))
    __table_args__ = (UniqueConstraint("source_id", "external_id"),)

src/vmk_data_collector/models/property_listing.py

Создать с всеми полями из SPECIFICATION.md:

  • id (PK), raw_data_id (FK, UNIQUE), source_id (FK), external_id
  • title, description, generated_description
  • deal_type (ENUM), property_type_id (FK)
  • price, currency, original_price, original_currency, price_per_sqm
  • total_area, living_area, kitchen_area, land_area
  • rooms_count, bedrooms_count, bathrooms_count, layout
  • floor, floors_total, building_year
  • building_type (ENUM), renovation_status (ENUM), ceiling_height, material
  • has_balcony, has_loggia, balcony_count, loggia_count
  • bathroom_type (ENUM), elevator_count, has_freight_elevator
  • parking_type (ENUM), heating_type (ENUM), internet, security
  • windows_direction, window_view (ENUM)
  • address_raw, city, district, micro_district, street, house_number
  • metro_station, metro_distance_min, metro_distance_type (ENUM)
  • latitude, longitude
  • contact_phone, contact_name, contact_email, is_agent, agency_name
  • publish_date, url_source, listing_status (ENUM), images_count
  • listing_quality_score, reliability_rating, sentiment_score
  • created_at, updated_at
  • __table_args__: UniqueConstraint("source_id", "external_id")

src/vmk_data_collector/models/property_image.py

class PropertyImage(Base):
    __tablename__ = "property_images"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE"))
    url: Mapped[str] = mapped_column(Text, nullable=False)
    local_path: Mapped[str | None] = mapped_column(String(512))
    hash: Mapped[str | None] = mapped_column(String(64))
    file_size: Mapped[int | None] = mapped_column(Integer)
    width: Mapped[int | None] = mapped_column(SmallInteger)
    height: Mapped[int | None] = mapped_column(SmallInteger)
    download_status: Mapped[ImageDownloadStatus] = mapped_column(Enum(ImageDownloadStatus, name="image_download_status"), default=ImageDownloadStatus.pending)
    ai_description: Mapped[str | None] = mapped_column(Text)
    analysis_status: Mapped[ImageAnalysisStatus] = mapped_column(Enum(ImageAnalysisStatus, name="image_analysis_status"), default=ImageAnalysisStatus.pending)
    order_index: Mapped[int] = mapped_column(SmallInteger, default=0)

src/vmk_data_collector/models/property_custom_field.py

class PropertyCustomField(Base):
    __tablename__ = "property_custom_fields"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE"))
    field_name: Mapped[str] = mapped_column(String(128), nullable=False)
    field_value: Mapped[str] = mapped_column(Text)
    field_type: Mapped[CustomFieldType] = mapped_column(Enum(CustomFieldType, name="custom_field_type"), default=CustomFieldType.str)
    __table_args__ = (UniqueConstraint("property_id", "field_name"),)

src/vmk_data_collector/models/property_snapshot.py

class PropertySnapshot(Base):
    __tablename__ = "property_snapshots"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE"))
    snapshot_data: Mapped[dict] = mapped_column(JSONB, default={})
    changed_fields: Mapped[dict] = mapped_column(JSONB, default={})
    created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())

src/vmk_data_collector/models/ai_enrichment.py

class AiEnrichment(Base):
    __tablename__ = "ai_enrichments"
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    property_id: Mapped[int] = mapped_column(ForeignKey("property_listings.id", ondelete="CASCADE"), unique=True)
    extracted_features: Mapped[dict] = mapped_column(JSONB, default={})
    price_assessment: Mapped[dict] = mapped_column(JSONB, default={})
    listing_quality_score: Mapped[int | None] = mapped_column(SmallInteger)
    reliability_rating: Mapped[int | None] = mapped_column(SmallInteger)
    sentiment_score: Mapped[float | None] = mapped_column(Numeric(3, 2))
    classification: Mapped[str | None] = mapped_column(String(64))
    image_analysis_results: Mapped[dict] = mapped_column(JSONB, default={})
    generated_description: Mapped[str | None] = mapped_column(Text)
    summary: Mapped[str | None] = mapped_column(Text)
    model_version: Mapped[str | None] = mapped_column(String(64))
    processing_time_ms: Mapped[int | None] = mapped_column(Integer)
    created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())

Шаг 2.5: src/vmk_data_collector/models/__init__.py

Импортировать все модели для Base.metadata:

from .data_source import DataSource
from .property_type import PropertyType
from .raw_parsing_data import RawParsingData
from .property_listing import PropertyListing
from .property_image import PropertyImage
from .property_custom_field import PropertyCustomField
from .property_snapshot import PropertySnapshot
from .ai_enrichment import AiEnrichment

Шаг 2.6: Alembic initial migration

alembic revision --autogenerate -m "initial"
alembic upgrade head

Шаг 2.7: Seed справочников

Создать src/vmk_data_collector/db/seed.py:

  • Функция async def seed_property_types(session: AsyncSession) -> None
  • Заполняет property_types (apartment, house, townhouse, commercial, land, garage, office, warehouse, retail, cottage, room, new_building)
  • Заполняет deal_types (sale, rent_long, rent_short)

Definition of Done фазы 2

  • alembic upgrade head проходит без ошибок
  • Все 8 таблиц созданы в PostgreSQL
  • \dt в psql показывает все таблицы
  • seed_property_types() заполняет справочники
  • Модели импортируются без ошибок

Фаза 3: Инфраструктура БД и репозитории

Шаг 3.1: Engine + Session (src/vmk_data_collector/db/engine.py)

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from vmk_data_collector.core.config import settings

engine = create_async_engine(
    settings.database_url_async,
    pool_size=settings.database_pool_size,
    max_overflow=settings.database_max_overflow,
    echo=settings.database_echo,
)

Шаг 3.2: Session maker (src/vmk_data_collector/db/session.py)

from .engine import engine
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db_session() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

Шаг 3.3: Базовый репозиторий (src/vmk_data_collector/db/repositories/base.py)

class BaseRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def add(self, obj: T) -> T:
        self._session.add(obj)
        await self._session.flush()
        return obj

    async def delete(self, obj: T) -> None:
        await self._session.delete(obj)

    async def commit(self) -> None:
        await self._session.commit()

Шаг 3.4: RawDataRepository (src/vmk_data_collector/db/repositories/raw_data.py)

Методы:

  • async def create(self, source_id: int | None, external_id: str | None, payload: dict) -> RawParsingData
  • async def get_by_id(self, raw_data_id: int) -> RawParsingData | None
  • async def get_by_source_and_external(self, source_id: int, external_id: str) -> RawParsingData | None
  • async def update_status(self, raw_data_id: int, status: RawDataStatus, error_message: str | None = None) -> None
  • async def set_processed(self, raw_data_id: int) -> None

Шаг 3.5: PropertyRepository (src/vmk_data_collector/db/repositories/property.py)

Методы:

  • async def create(self, **kwargs) -> PropertyListing
  • async def get_by_id(self, property_id: int) -> PropertyListing | None
  • async def get_by_source_and_external(self, source_id: int, external_id: str) -> PropertyListing | None
  • async def update(self, property_id: int, **kwargs) -> PropertyListing
  • async def delete_custom_fields(self, property_id: int) -> None

Шаг 3.6: ImageRepository (src/vmk_data_collector/db/repositories/image.py)

Методы:

  • async def create(self, property_id: int, url: str, order_index: int = 0) -> PropertyImage
  • async def get_by_property(self, property_id: int) -> list[PropertyImage]
  • async def get_by_hash(self, property_id: int, image_hash: str) -> PropertyImage | None
  • async def update_downloaded(self, image_id: int, local_path: str, file_size: int, width: int, height: int, image_hash: str) -> None
  • async def update_analysis(self, image_id: int, ai_description: str) -> None

Шаг 3.7: CustomFieldRepository (src/vmk_data_collector/db/repositories/custom_field.py)

Методы:

  • async def create(self, property_id: int, field_name: str, field_value: str, field_type: CustomFieldType = CustomFieldType.str) -> PropertyCustomField
  • async def bulk_create(self, property_id: int, fields: list[dict]) -> None
  • async def delete_by_property(self, property_id: int) -> None

Шаг 3.8: SnapshotRepository (src/vmk_data_collector/db/repositories/snapshot.py)

Методы:

  • async def create(self, property_id: int, snapshot_data: dict, changed_fields: dict) -> PropertySnapshot

Шаг 3.9: AiEnrichmentRepository (src/vmk_data_collector/db/repositories/ai_enrichment.py)

Методы:

  • async def create(self, property_id: int, **kwargs) -> AiEnrichment
  • async def delete_by_property(self, property_id: int) -> None

Шаг 3.10: DataSourceRepository (src/vmk_data_collector/db/repositories/data_source.py)

Методы:

  • async def get_or_create_by_slug(self, slug: str, name: str | None = None) -> DataSource

Definition of Done фазы 3

  • get_db_session() создаёт валидную async сессию
  • Каждый репозиторий покрыт unit-тестом (мок сессии)
  • CRUD операции работают через реальный engine (integration smoke test)

Фаза 4: Pydantic схемы (валидация данных)

Шаг 4.1: Domain entities (src/vmk_data_collector/domain/entities.py)

@dataclass
class NormalizedProperty:
    property_type: str
    deal_type: str
    title: str | None
    description: str | None
    price: float | None
    currency: str | None
    total_area: float | None
    ... (все поля из property_listings)
    custom_fields: dict[str, Any]
    images: list[str]

@dataclass
class AiImageAnalysis:
    overall_condition: str | None
    rooms_observed: int | None
    issues_found: list[str]
    positive_highlights: list[str]
    view_from_window: str | None
    furniture_included: bool | None
    appliances_included: list[str]

@dataclass
class AiEnrichmentResult:
    extracted_features: dict[str, Any]
    price_assessment: dict[str, Any]
    listing_quality_score: int | None
    reliability_rating: int | None
    sentiment_score: float | None
    classification: str | None
    image_analysis_results: dict[str, Any]
    generated_description: str | None
    summary: str | None
    model_version: str | None
    processing_time_ms: int | None

Шаг 4.2: API schemas (src/vmk_data_collector/schemas/raw_data.py)

  • class RawDataIngestRequest(BaseModel):
    • source_slug: str
    • external_id: str
    • payload: dict[str, Any]
  • class IngestResponse(BaseModel):
    • job_id: int
    • property_id: int | None
    • status: str
    • reason: str | None = None
    • message: str
    • snapshot_id: int | None = None

Шаг 4.3: AI schemas (src/vmk_data_collector/schemas/ai_response.py)

  • class AiNormalizerResponse(BaseModel):
    • is_real_estate: bool
    • reason: str | None
    • normalized: NormalizedPropertySchema | None
  • class AiImageAnalysisResponse(BaseModel):
    • overall_condition: str | None
    • rooms_observed: int | None
    • issues_found: list[str]
    • positive_highlights: list[str]
    • view_from_window: str | None
    • furniture_included: bool | None
    • appliances_included: list[str]
  • class AiEnrichmentResponse(BaseModel):
    • extracted_features: dict[str, Any]
    • price_assessment: dict[str, Any]
    • listing_quality_score: int | None
    • reliability_rating: int | None
    • sentiment_score: float | None
    • classification: str | None
    • image_analysis_results: dict[str, Any]
    • generated_description: str | None
    • summary: str | None
    • model_version: str | None
    • processing_time_ms: int | None

Шаг 4.4: NormalizedPropertySchema (src/vmk_data_collector/schemas/normalized.py)

Pydantic-модель с всеми полями property_listings + custom_fields: dict[str, Any].

Definition of Done фазы 4

  • Pydantic валидация RawDataIngestRequest работает с минимальным и полным payload
  • AiNormalizerResponse корректно парсит JSON с is_real_estate: false
  • NormalizedPropertySchema маппится из словаря без ошибок

Фаза 5: AI-слой (Ollama клиент + нормализатор + анализатор)

Шаг 5.1: Ollama HTTP клиент (src/vmk_data_collector/services/ollama_client.py)

class OllamaClient:
    def __init__(self, base_url: str, timeout: int) -> None:
        self._client = httpx.AsyncClient(base_url=base_url, timeout=timeout)

    async def chat(self, model: str, messages: list[dict], json_mode: bool = False) -> dict:
        payload = {"model": model, "messages": messages, "stream": False}
        if json_mode:
            payload["format"] = "json"
        response = await self._client.post("/api/chat", json=payload)
        response.raise_for_status()
        return response.json()

    async def chat_with_images(self, model: str, messages: list[dict], images_base64: list[str]) -> dict:
        # messages[0]["images"] = images_base64
        ...

    async def close(self) -> None:
        await self._client.aclose()

Шаг 5.2: AiNormalizer (src/vmk_data_collector/services/ai_normalizer.py)

class AiNormalizer:
    SYSTEM_PROMPT = """Ты — анализатор объявлений о недвижимости.
Определи, является ли текст объявлением о недвижимости.
Если нет — верни {"is_real_estate": false, "reason": "..."}.
Если да — верни {"is_real_estate": true, "normalized": {...}}.
Ответь ТОЛЬКО JSON."""

    async def normalize(self, payload: dict) -> AiNormalizerResponse:
        # 1. Формируем текст из payload (title + description + params)
        # 2. Вызываем ollama_client.chat(json_mode=True)
        # 3. Парсим ответ через AiNormalizerResponse
        # 4. Если ollama_mock — возвращаем фиксированный мок

Шаг 5.3: AiImageAnalyzer (src/vmk_data_collector/services/ai_image_analyzer.py)

class AiImageAnalyzer:
    SYSTEM_PROMPT = """Опиши состояние объекта недвижимости на фото.
Ответь ТОЛЬКО JSON с полями: overall_condition, rooms_observed, issues_found, positive_highlights, view_from_window, furniture_included, appliances_included."""

    async def analyze(self, image_base64: str) -> AiImageAnalysisResponse:
        # 1. Вызываем ollama_client.chat_with_images()
        # 2. Парсим AiImageAnalysisResponse

Шаг 5.4: AiEnricher (src/vmk_data_collector/services/ai_enricher.py)

class AiEnricher:
    SYSTEM_PROMPT = """Проанализируй объявление о недвижимости.
Верни ТОЛЬКО JSON с полями:
extracted_features, price_assessment, listing_quality_score (1-10),
reliability_rating (1-5), sentiment_score (-1..1), classification,
generated_description, summary, language."""

    async def enrich(self, normalized: NormalizedProperty, image_analysis_results: dict) -> AiEnrichmentResult:
        # 1. Формируем prompt из текста + image_analysis
        # 2. Вызываем ollama_client.chat(json_mode=True)
        # 3. Парсим AiEnrichmentResponse
        # 4. Маппим в AiEnrichmentResult

Шаг 5.5: Image Downloader (src/vmk_data_collector/services/image_downloader.py)

class ImageDownloader:
    def __init__(self, storage_path: Path) -> None:
        self._storage_path = storage_path

    async def download(self, property_id: int, image_url: str, order_index: int) -> PropertyImageDownloadResult:
        # 1. httpx async GET image_url
        # 2. SHA-256 content
        # 3. ext из Content-Type или URL
        # 4. local_path = storage_path / str(property_id) / f"{hash}.{ext}"
        # 5. Сохранить на диск
        # 6. Pillow: width, height
        # 7. file_size = len(content)
        # 8. Вернуть dataclass с local_path, hash, width, height, file_size

Definition of Done фазы 5

  • OllamaClient.chat() мокается в тестах (respx/httpx_mock)
  • AiNormalizer корректно reject'ит авто-объявление
  • AiNormalizer корректно accept'ит квартиру из каши текста
  • ImageDownloader скачивает картинку, считает hash, извлекает размеры
  • AiImageAnalyzer возвращает структурированный JSON
  • AiEnricher возвращает enrichment с quality_score и reliability_rating

Фаза 6: Пайплайн (PropertyPipeline + FastAPI)

Шаг 6.1: PropertyPipeline (src/vmk_data_collector/services/property_pipeline.py)

class PropertyPipeline:
    def __init__(
        self,
        raw_repo: RawDataRepository,
        property_repo: PropertyRepository,
        image_repo: ImageRepository,
        custom_field_repo: CustomFieldRepository,
        snapshot_repo: SnapshotRepository,
        enrichment_repo: AiEnrichmentRepository,
        data_source_repo: DataSourceRepository,
        normalizer: AiNormalizer,
        image_downloader: ImageDownloader,
        image_analyzer: AiImageAnalyzer,
        enricher: AiEnricher,
    ) -> None:
        ...

    async def process(self, raw_data_id: int) -> IngestResponse:
        # 1. raw_repo.get_by_id(raw_data_id)
        # 2. raw_repo.update_status → processing
        # 3. normalizer.normalize(raw.payload)
        # 4. Если not is_real_estate:
        #      raw_repo.update_status → invalid
        #      return IngestResponse(status="invalid", reason=...)
        # 5. data_source = data_source_repo.get_or_create_by_slug(source_slug)
        # 6. existing = property_repo.get_by_source_and_external(...)
        # 7. Если existing:
        #      snapshot_repo.create(existing)
        #      property_repo.update(existing.id, **normalized_data)
        #      custom_field_repo.delete_by_property(existing.id)
        #      property_id = existing.id
        #    Иначе:
        #      property = property_repo.create(**normalized_data)
        #      property_id = property.id
        # 8. custom_field_repo.bulk_create(property_id, normalized.custom_fields)
        # 9. images: for url in normalized.images:
        #      image = image_repo.create(property_id, url, order)
        #      result = await image_downloader.download(property_id, url, order)
        #      image_repo.update_downloaded(image.id, result.local_path, ...)
        #      analysis = await image_analyzer.analyze(base64)
        #      image_repo.update_analysis(image.id, analysis.overall_condition)
        # 10. enrichment = await enricher.enrich(normalized, aggregated_image_analysis)
        # 11. enrichment_repo.delete_by_property(property_id)
        # 12. enrichment_repo.create(property_id, **enrichment)
        # 13. raw_repo.set_processed(raw_data_id)
        # 14. return IngestResponse(status="completed", property_id=...)

Шаг 6.2: FastAPI main (src/vmk_data_collector/main.py)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup: configure_logging, create image storage dir
    yield
    # shutdown: close ollama client

app = FastAPI(title="VMK Data Collector", version="0.1.0", lifespan=lifespan)
app.include_router(properties_router, prefix="/api/v1")

Шаг 6.3: API deps (src/vmk_data_collector/api/deps.py)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

async def get_property_pipeline(db: AsyncSession = Depends(get_db)) -> PropertyPipeline:
    # Собрать все зависимости вручную (или использовать DI-контейнер)
    ...

Шаг 6.4: Router (src/vmk_data_collector/api/v1/router_properties.py)

router = APIRouter(prefix="/properties")

@router.post("/ingest", response_model=IngestResponse, status_code=202)
async def ingest_property(
    request: RawDataIngestRequest,
    pipeline: PropertyPipeline = Depends(get_property_pipeline),
) -> IngestResponse:
    # 1. Создать raw_parsing_data через raw_repo
    # 2. Запустить pipeline.process(raw_data_id)
    # 3. Вернуть IngestResponse

Шаг 6.5: Exception handlers (src/vmk_data_collector/core/exceptions.py + в main.py)

  • AppException → 500 с деталями
  • ValidationError → 422
  • NotRealEstateError → 202 (но со статусом invalid)
  • AIProcessingError → 202 (но со статусом failed)

Definition of Done фазы 6

  • POST /api/v1/properties/ingest возвращает 202
  • Минимальный payload (title, url, images) создаёт property_listings
  • Полный payload заполняет все поля + custom_fields
  • Повторный ingest с тем же external_id → snapshot + update
  • Недвижимость-объявление → reject, только raw_data со статусом invalid

Фаза 7: Тесты

Шаг 7.1: conftest.py

Фикстуры:

  • async_enginecreate_async_engine("postgresql+asyncpg://postgres:postgres@localhost:5432/vmk_data_test")
  • init_db(async_engine)Base.metadata.create_all
  • db_session(async_engine) — async session с begin() + rollback
  • client(db_session) — FastAPI TestClient с переопределённым get_db
  • mock_ollama_client() — фиксированные ответы
  • sample_payload_minimal() — только title, url, images
  • sample_payload_full() — все поля
  • sample_payload_car() — объявление про машину (для reject)

Шаг 7.2: Unit-тесты (tests/unit/)

  • test_ai_normalizer.py:
    • test_reject_car_listing()is_real_estate=False
    • test_accept_apartment_from_messy_text() — извлекает rooms, area, floor из каши
    • test_mock_mode_returns_fixed_json() — при OLLAMA_MOCK=true
  • test_ai_image_analyzer.py:
    • test_analyze_returns_structured_response()
    • test_analyze_with_empty_image()
  • test_ai_enricher.py:
    • test_enrich_returns_quality_score_and_reliability()
    • test_enrich_with_image_analysis()
  • test_image_downloader.py:
    • test_download_creates_file_and_returns_hash() (mock httpx + tempfile)
    • test_dedup_returns_existing_hash()
  • test_property_pipeline.py:
    • test_new_listing_creates_all_records() (моки всех репозиториев)
    • test_existing_listing_creates_snapshot() (моки)
    • test_invalid_payload_returns_invalid_status()

Шаг 7.3: Integration-тесты (tests/integration/)

  • test_api_ingest.py:
    • test_ingest_minimal_payload_202() — проверяет создание raw + listing
    • test_ingest_full_payload_populates_all_fields()
    • test_ingest_duplicate_external_id_updates_and_creates_snapshot()
    • test_ingest_car_payload_returns_invalid() — проверяет, что listing не создан
  • test_db_pipeline.py:
    • test_upsert_creates_snapshot_with_correct_data()
    • test_custom_fields_deleted_on_update()
    • test_images_deduplicated_by_hash()

Шаг 7.4: Запуск тестов

pytest tests/unit -v -m unit
pytest tests/integration -v -m integration
pytest --cov=src --cov-report=term-missing

Definition of Done фазы 7

  • ≥ 80% покрытие кода
  • Все unit-тесты проходят
  • Все integration-тесты проходят (требуют PostgreSQL)
  • CI-ready: pytest проходит в чистом окружении

Фаза 8: Документация и финализация

Шаг 8.1: README.md

  • Описание, стек, быстрый старт, API endpoint, структура проекта.

Шаг 8.2: docs/SPECIFICATION.md

  • Полное ТЗ (уже написано, обновить при необходимости).

Шаг 8.3: docs/ARCHITECTURE.md

  • Архитектура (уже написано, обновить при необходимости).

Шаг 8.4: Makefile (опционально)

.PHONY: dev test lint migrate

dev:
    docker compose up -d
    uvicorn vmk_data_collector.main:app --reload

test:
    pytest -v

lint:
    ruff check src tests
    black --check src tests
    mypy src

migrate:
    alembic upgrade head

seed:
    python -c "import asyncio; from vmk_data_collector.db.seed import seed_all; asyncio.run(seed_all())"

Шаг 8.5: Dockerfile (опционально)

FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install -e ".[dev]"
COPY . .
CMD ["uvicorn", "vmk_data_collector.main:app", "--host", "0.0.0.0", "--port", "8000"]

Definition of Done фазы 8

  • make dev запускает сервис
  • make test проходит
  • make lint проходит
  • README содержит всё для нового разработчика

Сводная таблица файлов

# Путь Фаза Описание
1 pyproject.toml 1 Зависимости, конфиги инструментов
2 .env.example 1 Шаблон env
3 docker-compose.yml 1 PostgreSQL
4 .gitignore 0 Исключения git
5 src/vmk_data_collector/core/config.py 1 Pydantic Settings
6 src/vmk_data_collector/core/logging.py 1 structlog
7 src/vmk_data_collector/core/exceptions.py 1 Исключения
8 src/vmk_data_collector/db/base.py 2 DeclarativeBase
9 src/vmk_data_collector/domain/enums.py 2 Все ENUM
10 src/vmk_data_collector/models/data_source.py 2 ORM DataSource
11 src/vmk_data_collector/models/property_type.py 2 ORM PropertyType
12 src/vmk_data_collector/models/raw_parsing_data.py 2 ORM RawParsingData
13 src/vmk_data_collector/models/property_listing.py 2 ORM PropertyListing
14 src/vmk_data_collector/models/property_image.py 2 ORM PropertyImage
15 src/vmk_data_collector/models/property_custom_field.py 2 ORM PropertyCustomField
16 src/vmk_data_collector/models/property_snapshot.py 2 ORM PropertySnapshot
17 src/vmk_data_collector/models/ai_enrichment.py 2 ORM AiEnrichment
18 src/vmk_data_collector/models/__init__.py 2 Импорты
19 alembic/env.py 2 Alembic async env
20 src/vmk_data_collector/db/seed.py 2 Seed справочников
21 src/vmk_data_collector/db/engine.py 3 AsyncEngine
22 src/vmk_data_collector/db/session.py 3 AsyncSessionLocal
23 src/vmk_data_collector/db/repositories/base.py 3 BaseRepository
24 src/vmk_data_collector/db/repositories/raw_data.py 3 RawDataRepository
25 src/vmk_data_collector/db/repositories/property.py 3 PropertyRepository
26 src/vmk_data_collector/db/repositories/image.py 3 ImageRepository
27 src/vmk_data_collector/db/repositories/custom_field.py 3 CustomFieldRepository
28 src/vmk_data_collector/db/repositories/snapshot.py 3 SnapshotRepository
29 src/vmk_data_collector/db/repositories/ai_enrichment.py 3 AiEnrichmentRepository
30 src/vmk_data_collector/db/repositories/data_source.py 3 DataSourceRepository
31 src/vmk_data_collector/domain/entities.py 4 Dataclass сущности
32 src/vmk_data_collector/schemas/raw_data.py 4 API схемы
33 src/vmk_data_collector/schemas/ai_response.py 4 AI схемы
34 src/vmk_data_collector/schemas/normalized.py 4 NormalizedPropertySchema
35 src/vmk_data_collector/services/ollama_client.py 5 HTTP клиент Ollama
36 src/vmk_data_collector/services/ai_normalizer.py 5 AiNormalizer
37 src/vmk_data_collector/services/ai_image_analyzer.py 5 AiImageAnalyzer
38 src/vmk_data_collector/services/ai_enricher.py 5 AiEnricher
39 src/vmk_data_collector/services/image_downloader.py 5 ImageDownloader
40 src/vmk_data_collector/services/property_pipeline.py 6 PropertyPipeline
41 src/vmk_data_collector/api/deps.py 6 DI-зависимости
42 src/vmk_data_collector/api/v1/router_properties.py 6 FastAPI router
43 src/vmk_data_collector/main.py 6 Точка входа
44 tests/conftest.py 7 Фикстуры
45 tests/unit/test_ai_normalizer.py 7 Unit тесты нормализатора
46 tests/unit/test_ai_image_analyzer.py 7 Unit тесты анализатора
47 tests/unit/test_ai_enricher.py 7 Unit тесты enricher
48 tests/unit/test_image_downloader.py 7 Unit тесты downloader
49 tests/unit/test_property_pipeline.py 7 Unit тесты pipeline
50 tests/integration/test_api_ingest.py 7 Integration API
51 tests/integration/test_db_pipeline.py 7 Integration DB
52 docs/SPECIFICATION.md 8 ТЗ
53 docs/ARCHITECTURE.md 8 Архитектура
54 README.md 8 README
55 Makefile 8 Команды
56 Dockerfile 8 Docker образ

Граф зависимостей фаз

Фаза 0 (env)
    │
    ▼
Фаза 1 (config) ─────────────────────────┐
    │                                    │
    ▼                                    │
Фаза 2 (models) ──► Фаза 3 (repos) ──► Фаза 4 (schemas)
    │                                    │           │
    ▼                                    │           ▼
Alembic migrate ◄────────────────────────┘    Фаза 5 (AI services)
                                                   │
                                                   ▼
                                          Фаза 6 (Pipeline + API)
                                                   │
                                                   ▼
                                          Фаза 7 (Tests)
                                                   │
                                                   ▼
                                          Фаза 8 (Docs + Deploy)

Можно параллельно: Фаза 4 (schemas) и Фаза 5 (AI services) не зависят друг от друга. Оба зависят от Фазы 1.


Чеклист перед началом кодирования

  • Docker Compose для PostgreSQL поднят и работает
  • .env скопирован из .env.example и настроен
  • pip install -e ".[dev]" прошёл успешно
  • alembic инициализирован, alembic.ini настроен на async
  • Все 8 таблиц спроектированы и задокументированы в SPECIFICATION.md
  • AI-схемы ответов (Normalizer, ImageAnalyzer, Enricher) согласованы
  • Тестовая стратегия определена