Newer
Older
vmk-360-data_collector / tests / unit / test_property_pipeline.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy 1 day ago 12 KB fix: code review critical and high issues
"""Unit tests for PropertyPipeline."""

from typing import Any
from unittest.mock import AsyncMock, MagicMock

import pytest

from vmk_data_collector.core.circuit_breaker import CircuitBreakerOpenError
from vmk_data_collector.core.exceptions import OllamaRetryableError
from vmk_data_collector.domain.entities import (
    AiEnrichmentResult,
    NormalizedProperty,
)
from vmk_data_collector.schemas.ai_response import AiNormalizerResponse
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.property_pipeline import PropertyPipeline


@pytest.fixture
def mocks() -> dict[str, Any]:
    """All mocked dependencies for PropertyPipeline."""
    return {
        "raw_repo": AsyncMock(),
        "property_repo": AsyncMock(),
        "image_repo": AsyncMock(),
        "custom_field_repo": AsyncMock(),
        "snapshot_repo": AsyncMock(),
        "enrichment_repo": AsyncMock(),
        "data_source_repo": AsyncMock(),
        "property_type_repo": AsyncMock(),
        "normalizer": AsyncMock(spec=AiNormalizer),
        "image_downloader": AsyncMock(),
        "image_analyzer": AsyncMock(spec=AiImageAnalyzer),
        "enricher": AsyncMock(),
    }


@pytest.fixture
def raw_data() -> MagicMock:
    obj = MagicMock()
    obj.id = 1
    obj.payload = {"title": "Test", "price": "1000", "source_slug": "src"}
    obj.external_id = "ext-1"
    obj.source_id = None
    return obj


@pytest.fixture
def pipeline(mocks: dict[str, Any], raw_data: MagicMock) -> PropertyPipeline:
    mocks["raw_repo"].get_by_id.return_value = raw_data

    ds = MagicMock()
    ds.id = 10
    mocks["data_source_repo"].get_or_create_by_slug.return_value = ds

    prop = MagicMock()
    prop.id = 100
    mocks["property_repo"].create.return_value = prop

    pt = MagicMock()
    pt.id = 20
    mocks["property_type_repo"].get_or_create_by_slug.return_value = pt

    return PropertyPipeline(
        raw_repo=mocks["raw_repo"],
        property_repo=mocks["property_repo"],
        image_repo=mocks["image_repo"],
        custom_field_repo=mocks["custom_field_repo"],
        snapshot_repo=mocks["snapshot_repo"],
        enrichment_repo=mocks["enrichment_repo"],
        data_source_repo=mocks["data_source_repo"],
        property_type_repo=mocks["property_type_repo"],
        normalizer=mocks["normalizer"],
        image_downloader=mocks["image_downloader"],
        image_analyzer=mocks["image_analyzer"],
        enricher=mocks["enricher"],
        active_jobs=set(),
    )


@pytest.fixture
def norm_response() -> AiNormalizerResponse:
    return AiNormalizerResponse(
        is_real_estate=True,
        reason=None,
        normalized={
            "property_type": "apartment",
            "deal_type": "sale",
            "title": "Test Title",
            "description": "Desc",
            "price": 100000,
            "currency": "UAH",
            "total_area": 50,
            "rooms_count": 2,
            "floor": 3,
            "floors_total": 9,
            "city": "Kyiv",
            "address_raw": "Kyiv",
            "images": ["http://img/1.jpg"],
            "custom_fields": {"key": "value"},
        },
    )


class TestCompletedPath:
    @pytest.mark.asyncio
    async def test_happy_path_new_listing(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response
        mocks["property_repo"].get_by_source_and_external.return_value = None
        mocks["image_repo"].get_by_hash.return_value = None

        dl_result = MagicMock()
        dl_result.local_path = "/tmp/img.jpg"
        dl_result.image_hash = "abc"
        dl_result.file_size = 1234
        dl_result.width = 800
        dl_result.height = 600
        mocks["image_downloader"].download.return_value = dl_result

        img = MagicMock()
        img.id = 50
        mocks["image_repo"].create.return_value = img

        analysis = MagicMock()
        analysis.overall_condition = "good"
        analysis.model_dump.return_value = {"overall_condition": "good"}
        mocks["image_analyzer"].analyze.return_value = analysis

        mocks["image_repo"].get_by_property.return_value = [img]

        enrichment = AiEnrichmentResult(
            extracted_features={},
            price_assessment={},
            listing_quality_score=7,
            reliability_rating=4,
            sentiment_score=0.5,
            classification="жилая_недвижимость",
            generated_description="GD",
            summary="S",
            model_version="v1",
            processing_time_ms=100,
        )
        mocks["enricher"].enrich.return_value = enrichment

        result = await pipeline.process(1)

        assert result.status == "completed"
        assert result.job_id == 1
        assert result.property_id == 100
        mocks["raw_repo"].set_processed.assert_awaited_once_with(1)
        mocks["custom_field_repo"].bulk_create.assert_awaited_once()
        mocks["enrichment_repo"].create.assert_awaited_once()

    @pytest.mark.asyncio
    async def test_existing_listing_creates_snapshot(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
        monkeypatch: pytest.MonkeyPatch,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response

        existing = MagicMock()
        existing.id = 99
        mocks["property_repo"].get_by_source_and_external.return_value = existing

        monkeypatch.setattr(
            PropertyPipeline,
            "_listing_to_dict",
            staticmethod(lambda x: {"price": 90000}),
        )
        monkeypatch.setattr(
            PropertyPipeline,
            "_compute_changed_fields",
            staticmethod(lambda old, new: {"price": {"old": 90000, "new": 100000}}),
        )

        snap = MagicMock()
        snap.id = 77
        mocks["snapshot_repo"].create.return_value = snap
        mocks["image_repo"].get_by_property.return_value = []
        mocks["enricher"].enrich.return_value = None

        result = await pipeline.process(1)

        assert result.status == "completed"
        mocks["snapshot_repo"].create.assert_awaited_once()
        assert any(
            call.args[0] == 99
            for call in mocks["property_repo"].update.await_args_list
        )
        mocks["custom_field_repo"].delete_by_property.assert_awaited_once_with(99)


class TestInvalidPath:
    @pytest.mark.asyncio
    async def test_not_real_estate(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
    ) -> None:
        mocks["normalizer"].normalize.return_value = AiNormalizerResponse(
            is_real_estate=False,
            reason="Not a property ad",
        )

        result = await pipeline.process(1)

        assert result.status == "invalid"
        assert "Not a property ad" in result.reason
        assert mocks["raw_repo"].update_status.await_count == 2
        mocks["raw_repo"].update_status.assert_awaited_with(
            1, "invalid", error_message="Not a property ad"
        )


class TestFailedPath:
    @pytest.mark.asyncio
    async def test_raw_not_found(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
    ) -> None:
        mocks["raw_repo"].get_by_id.return_value = None

        result = await pipeline.process(1)

        assert result.status == "failed"
        mocks["raw_repo"].update_status.assert_awaited_with(
            1, "failed", error_message="Raw data 1 not found"
        )

    @pytest.mark.asyncio
    async def test_circuit_breaker_open(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
    ) -> None:
        mocks["normalizer"].normalize.side_effect = CircuitBreakerOpenError()

        result = await pipeline.process(1)

        assert result.status == "failed"
        assert result.reason == "circuit_breaker_open"
        mocks["raw_repo"].update_status.assert_awaited_with(
            1, "failed", error_message="Circuit breaker open: Circuit breaker is OPEN"
        )

    @pytest.mark.asyncio
    async def test_stage_normalizer_unexpected_error(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
    ) -> None:
        mocks["normalizer"].normalize.side_effect = RuntimeError("boom")

        result = await pipeline.process(1)

        assert result.status == "failed"
        assert "boom" in result.message
        # update_status called twice: once in _stage_normalize, once in process()
        assert mocks["raw_repo"].update_status.await_count >= 1


class TestResilience:
    @pytest.mark.asyncio
    async def test_image_download_failure_continues(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response
        mocks["property_repo"].get_by_source_and_external.return_value = None
        mocks["image_downloader"].download.side_effect = RuntimeError("dl fail")
        mocks["image_repo"].get_by_property.return_value = []
        mocks["enricher"].enrich.return_value = None

        result = await pipeline.process(1)

        assert result.status == "completed"
        mocks["image_downloader"].download.assert_awaited()

    @pytest.mark.asyncio
    async def test_enricher_failure_continues(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response
        mocks["property_repo"].get_by_source_and_external.return_value = None
        mocks["image_repo"].get_by_property.return_value = []
        mocks["enricher"].enrich.return_value = None

        result = await pipeline.process(1)

        assert result.status == "completed"
        mocks["enrichment_repo"].create.assert_not_awaited()

    @pytest.mark.asyncio
    async def test_duplicate_image_skips_analysis(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response
        mocks["property_repo"].get_by_source_and_external.return_value = None

        dl_result = MagicMock()
        dl_result.local_path = "/tmp/img.jpg"
        dl_result.image_hash = "dup_hash"
        dl_result.file_size = 1234
        dl_result.width = 800
        dl_result.height = 600
        mocks["image_downloader"].download.return_value = dl_result

        duplicate = MagicMock()
        duplicate.id = 99
        mocks["image_repo"].get_by_hash.return_value = duplicate
        mocks["image_repo"].get_by_property.return_value = [duplicate]
        mocks["enricher"].enrich.return_value = None

        result = await pipeline.process(1)

        assert result.status == "completed"
        mocks["image_analyzer"].analyze.assert_not_awaited()
        mocks["image_repo"].create.assert_not_awaited()

    @pytest.mark.asyncio
    async def test_image_analysis_failure_continues(
        self,
        pipeline: PropertyPipeline,
        mocks: dict[str, Any],
        norm_response: AiNormalizerResponse,
    ) -> None:
        mocks["normalizer"].normalize.return_value = norm_response
        mocks["property_repo"].get_by_source_and_external.return_value = None
        mocks["image_repo"].get_by_hash.return_value = None

        dl_result = MagicMock()
        dl_result.local_path = "/tmp/img.jpg"
        dl_result.image_hash = "abc"
        dl_result.file_size = 1234
        dl_result.width = 800
        dl_result.height = 600
        mocks["image_downloader"].download.return_value = dl_result

        img = MagicMock()
        img.id = 50
        mocks["image_repo"].create.return_value = img
        mocks["image_analyzer"].analyze.side_effect = RuntimeError("analysis boom")
        mocks["image_repo"].get_by_property.return_value = [img]
        mocks["enricher"].enrich.return_value = None

        result = await pipeline.process(1)

        assert result.status == "completed"
        mocks["image_repo"].update_analysis.assert_not_awaited()


class TestHelpers:
    def test_compute_changed_fields(self) -> None:
        old = {"price": 100, "title": "Old"}
        new = {"price": 200, "title": "Old", "extra": "new"}
        changed = PropertyPipeline._compute_changed_fields(old, new)
        assert changed == {
            "price": {"old": 100, "new": 200},
            "extra": {"old": None, "new": "new"},
        }

    @pytest.mark.parametrize(
        "value,expected",
        [
            (True, "bool"),
            (42, "int"),
            (3.14, "float"),
            ("text", "str"),
            (None, "str"),
            ([], "str"),
        ],
    )
    def test_infer_field_type(self, value: Any, expected: str) -> None:
        assert PropertyPipeline._infer_field_type(value) == expected