"""Unit tests for PropertyPipeline."""
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from vmk_data_collector.core.circuit_breaker import CircuitBreakerOpenError
from vmk_data_collector.core.exceptions import OllamaRetryableError
from vmk_data_collector.domain.entities import (
AiEnrichmentResult,
NormalizedProperty,
)
from vmk_data_collector.schemas.ai_response import AiNormalizerResponse
from vmk_data_collector.services.ai_image_analyzer import AiImageAnalyzer
from vmk_data_collector.services.ai_normalizer import AiNormalizer
from vmk_data_collector.services.property_pipeline import PropertyPipeline
@pytest.fixture
def mocks() -> dict[str, Any]:
"""All mocked dependencies for PropertyPipeline."""
return {
"raw_repo": AsyncMock(),
"property_repo": AsyncMock(),
"image_repo": AsyncMock(),
"custom_field_repo": AsyncMock(),
"snapshot_repo": AsyncMock(),
"enrichment_repo": AsyncMock(),
"data_source_repo": AsyncMock(),
"property_type_repo": AsyncMock(),
"normalizer": AsyncMock(spec=AiNormalizer),
"image_downloader": AsyncMock(),
"image_analyzer": AsyncMock(spec=AiImageAnalyzer),
"enricher": AsyncMock(),
}
@pytest.fixture
def raw_data() -> MagicMock:
obj = MagicMock()
obj.id = 1
obj.payload = {"title": "Test", "price": "1000", "source_slug": "src"}
obj.external_id = "ext-1"
obj.source_id = None
return obj
@pytest.fixture
def pipeline(mocks: dict[str, Any], raw_data: MagicMock) -> PropertyPipeline:
mocks["raw_repo"].get_by_id.return_value = raw_data
ds = MagicMock()
ds.id = 10
mocks["data_source_repo"].get_or_create_by_slug.return_value = ds
prop = MagicMock()
prop.id = 100
mocks["property_repo"].create.return_value = prop
pt = MagicMock()
pt.id = 20
mocks["property_type_repo"].get_or_create_by_slug.return_value = pt
return PropertyPipeline(
raw_repo=mocks["raw_repo"],
property_repo=mocks["property_repo"],
image_repo=mocks["image_repo"],
custom_field_repo=mocks["custom_field_repo"],
snapshot_repo=mocks["snapshot_repo"],
enrichment_repo=mocks["enrichment_repo"],
data_source_repo=mocks["data_source_repo"],
property_type_repo=mocks["property_type_repo"],
normalizer=mocks["normalizer"],
image_downloader=mocks["image_downloader"],
image_analyzer=mocks["image_analyzer"],
enricher=mocks["enricher"],
active_jobs=set(),
)
@pytest.fixture
def norm_response() -> AiNormalizerResponse:
return AiNormalizerResponse(
is_real_estate=True,
reason=None,
normalized={
"property_type": "apartment",
"deal_type": "sale",
"title": "Test Title",
"description": "Desc",
"price": 100000,
"currency": "UAH",
"total_area": 50,
"rooms_count": 2,
"floor": 3,
"floors_total": 9,
"city": "Kyiv",
"address_raw": "Kyiv",
"images": ["http://img/1.jpg"],
"custom_fields": {"key": "value"},
},
)
class TestCompletedPath:
@pytest.mark.asyncio
async def test_happy_path_new_listing(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
mocks["property_repo"].get_by_source_and_external.return_value = None
mocks["image_repo"].get_by_hash.return_value = None
dl_result = MagicMock()
dl_result.local_path = "/tmp/img.jpg"
dl_result.image_hash = "abc"
dl_result.file_size = 1234
dl_result.width = 800
dl_result.height = 600
mocks["image_downloader"].download.return_value = dl_result
img = MagicMock()
img.id = 50
mocks["image_repo"].create.return_value = img
analysis = MagicMock()
analysis.overall_condition = "good"
analysis.model_dump.return_value = {"overall_condition": "good"}
mocks["image_analyzer"].analyze.return_value = analysis
mocks["image_repo"].get_by_property.return_value = [img]
enrichment = AiEnrichmentResult(
extracted_features={},
price_assessment={},
listing_quality_score=7,
reliability_rating=4,
sentiment_score=0.5,
classification="жилая_недвижимость",
generated_description="GD",
summary="S",
model_version="v1",
processing_time_ms=100,
)
mocks["enricher"].enrich.return_value = enrichment
result = await pipeline.process(1)
assert result.status == "completed"
assert result.job_id == 1
assert result.property_id == 100
mocks["raw_repo"].set_processed.assert_awaited_once_with(1)
mocks["custom_field_repo"].bulk_create.assert_awaited_once()
mocks["enrichment_repo"].create.assert_awaited_once()
@pytest.mark.asyncio
async def test_existing_listing_creates_snapshot(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
monkeypatch: pytest.MonkeyPatch,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
existing = MagicMock()
existing.id = 99
mocks["property_repo"].get_by_source_and_external.return_value = existing
monkeypatch.setattr(
PropertyPipeline,
"_listing_to_dict",
staticmethod(lambda x: {"price": 90000}),
)
monkeypatch.setattr(
PropertyPipeline,
"_compute_changed_fields",
staticmethod(lambda old, new: {"price": {"old": 90000, "new": 100000}}),
)
snap = MagicMock()
snap.id = 77
mocks["snapshot_repo"].create.return_value = snap
mocks["image_repo"].get_by_property.return_value = []
mocks["enricher"].enrich.return_value = None
result = await pipeline.process(1)
assert result.status == "completed"
mocks["snapshot_repo"].create.assert_awaited_once()
assert any(
call.args[0] == 99
for call in mocks["property_repo"].update.await_args_list
)
mocks["custom_field_repo"].delete_by_property.assert_awaited_once_with(99)
class TestInvalidPath:
@pytest.mark.asyncio
async def test_not_real_estate(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
) -> None:
mocks["normalizer"].normalize.return_value = AiNormalizerResponse(
is_real_estate=False,
reason="Not a property ad",
)
result = await pipeline.process(1)
assert result.status == "invalid"
assert "Not a property ad" in result.reason
assert mocks["raw_repo"].update_status.await_count == 2
mocks["raw_repo"].update_status.assert_awaited_with(
1, "invalid", error_message="Not a property ad"
)
class TestFailedPath:
@pytest.mark.asyncio
async def test_raw_not_found(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
) -> None:
mocks["raw_repo"].get_by_id.return_value = None
result = await pipeline.process(1)
assert result.status == "failed"
mocks["raw_repo"].update_status.assert_awaited_with(
1, "failed", error_message="Raw data 1 not found"
)
@pytest.mark.asyncio
async def test_circuit_breaker_open(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
) -> None:
mocks["normalizer"].normalize.side_effect = CircuitBreakerOpenError()
result = await pipeline.process(1)
assert result.status == "failed"
assert result.reason == "circuit_breaker_open"
mocks["raw_repo"].update_status.assert_awaited_with(
1, "failed", error_message="Circuit breaker open: Circuit breaker is OPEN"
)
@pytest.mark.asyncio
async def test_stage_normalizer_unexpected_error(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
) -> None:
mocks["normalizer"].normalize.side_effect = RuntimeError("boom")
result = await pipeline.process(1)
assert result.status == "failed"
assert "boom" in result.message
# update_status called twice: once in _stage_normalize, once in process()
assert mocks["raw_repo"].update_status.await_count >= 1
class TestResilience:
@pytest.mark.asyncio
async def test_image_download_failure_continues(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
mocks["property_repo"].get_by_source_and_external.return_value = None
mocks["image_downloader"].download.side_effect = RuntimeError("dl fail")
mocks["image_repo"].get_by_property.return_value = []
mocks["enricher"].enrich.return_value = None
result = await pipeline.process(1)
assert result.status == "completed"
mocks["image_downloader"].download.assert_awaited()
@pytest.mark.asyncio
async def test_enricher_failure_continues(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
mocks["property_repo"].get_by_source_and_external.return_value = None
mocks["image_repo"].get_by_property.return_value = []
mocks["enricher"].enrich.return_value = None
result = await pipeline.process(1)
assert result.status == "completed"
mocks["enrichment_repo"].create.assert_not_awaited()
@pytest.mark.asyncio
async def test_duplicate_image_skips_analysis(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
mocks["property_repo"].get_by_source_and_external.return_value = None
dl_result = MagicMock()
dl_result.local_path = "/tmp/img.jpg"
dl_result.image_hash = "dup_hash"
dl_result.file_size = 1234
dl_result.width = 800
dl_result.height = 600
mocks["image_downloader"].download.return_value = dl_result
duplicate = MagicMock()
duplicate.id = 99
mocks["image_repo"].get_by_hash.return_value = duplicate
mocks["image_repo"].get_by_property.return_value = [duplicate]
mocks["enricher"].enrich.return_value = None
result = await pipeline.process(1)
assert result.status == "completed"
mocks["image_analyzer"].analyze.assert_not_awaited()
mocks["image_repo"].create.assert_not_awaited()
@pytest.mark.asyncio
async def test_image_analysis_failure_continues(
self,
pipeline: PropertyPipeline,
mocks: dict[str, Any],
norm_response: AiNormalizerResponse,
) -> None:
mocks["normalizer"].normalize.return_value = norm_response
mocks["property_repo"].get_by_source_and_external.return_value = None
mocks["image_repo"].get_by_hash.return_value = None
dl_result = MagicMock()
dl_result.local_path = "/tmp/img.jpg"
dl_result.image_hash = "abc"
dl_result.file_size = 1234
dl_result.width = 800
dl_result.height = 600
mocks["image_downloader"].download.return_value = dl_result
img = MagicMock()
img.id = 50
mocks["image_repo"].create.return_value = img
mocks["image_analyzer"].analyze.side_effect = RuntimeError("analysis boom")
mocks["image_repo"].get_by_property.return_value = [img]
mocks["enricher"].enrich.return_value = None
result = await pipeline.process(1)
assert result.status == "completed"
mocks["image_repo"].update_analysis.assert_not_awaited()
class TestHelpers:
def test_compute_changed_fields(self) -> None:
old = {"price": 100, "title": "Old"}
new = {"price": 200, "title": "Old", "extra": "new"}
changed = PropertyPipeline._compute_changed_fields(old, new)
assert changed == {
"price": {"old": 100, "new": 200},
"extra": {"old": None, "new": "new"},
}
@pytest.mark.parametrize(
"value,expected",
[
(True, "bool"),
(42, "int"),
(3.14, "float"),
("text", "str"),
(None, "str"),
([], "str"),
],
)
def test_infer_field_type(self, value: Any, expected: str) -> None:
assert PropertyPipeline._infer_field_type(value) == expected