"""Unit tests for ImageDownloader."""
import asyncio
from contextlib import asynccontextmanager
from io import BytesIO
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from PIL import Image
from vmk_data_collector.core.exceptions import ImageDownloadError
from vmk_data_collector.services.image_downloader import (
ImageDownloader,
PropertyImageDownloadResult,
)
@pytest.fixture
def storage(tmp_path: Path) -> Path:
return tmp_path / "images"
@pytest.fixture
def downloader(storage: Path) -> ImageDownloader:
return ImageDownloader(storage_path=storage)
@pytest.fixture(autouse=True)
def _patch_retry(monkeypatch: pytest.MonkeyPatch) -> None:
"""Re-wrap ImageDownloader.download with a no-op before_sleep."""
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from vmk_data_collector.services.image_downloader import ImageDownloader
original = ImageDownloader.download.__wrapped__
wrapped = retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=1, max=10),
retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
before_sleep=lambda retry_state: None,
reraise=True,
)(original)
monkeypatch.setattr(ImageDownloader, "download", wrapped)
class TestHappyPath:
@pytest.mark.asyncio
async def test_downloads_image_and_returns_metadata(
self,
downloader: ImageDownloader,
storage: Path,
) -> None:
img_bytes = self._make_jpeg_bytes(100, 200)
response_mock = self._make_response_mock(
content=img_bytes,
headers={"content-type": "image/jpeg"},
)
client_mock = self._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
result = await downloader.download(1, "http://example.com/img.jpg", 0)
assert isinstance(result, PropertyImageDownloadResult)
assert result.width == 100
assert result.height == 200
assert result.file_size == len(img_bytes)
assert result.local_path.endswith(".jpg")
assert Path(result.local_path).exists()
@pytest.mark.asyncio
async def test_detects_extension_from_url(
self,
downloader: ImageDownloader,
) -> None:
img_bytes = self._make_jpeg_bytes(50, 50)
response_mock = self._make_response_mock(
content=img_bytes,
headers={"content-type": "application/octet-stream"},
)
client_mock = self._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
result = await downloader.download(1, "http://example.com/photo.png", 0)
assert result.local_path.endswith(".png")
@pytest.mark.asyncio
async def test_detects_webp_from_content_type(
self,
downloader: ImageDownloader,
) -> None:
img_bytes = self._make_webp_bytes(50, 50)
response_mock = self._make_response_mock(
content=img_bytes,
headers={"content-type": "image/webp"},
)
client_mock = self._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
result = await downloader.download(1, "http://example.com/img", 0)
assert result.local_path.endswith(".webp")
@staticmethod
def _make_jpeg_bytes(width: int, height: int) -> bytes:
buf = BytesIO()
Image.new("RGB", (width, height), color=(0, 0, 0)).save(buf, format="JPEG")
return buf.getvalue()
@staticmethod
def _make_webp_bytes(width: int, height: int) -> bytes:
buf = BytesIO()
Image.new("RGB", (width, height), color=(0, 0, 0)).save(buf, format="WEBP")
return buf.getvalue()
@staticmethod
def _make_response_mock(
content: bytes,
headers: dict[str, str] | None = None,
status_code: int = 200,
) -> Any:
response = AsyncMock()
response.headers = headers or {}
response.status_code = status_code
response.raise_for_status = lambda: None
async def iter_bytes():
chunk_size = 1024
for i in range(0, len(content), chunk_size):
yield content[i : i + chunk_size]
response.iter_bytes = iter_bytes
return response
@staticmethod
def _make_client_mock(response: Any) -> Any:
client = AsyncMock()
@asynccontextmanager
async def stream_cm(_method, _url, **_kwargs):
yield response
client.stream = stream_cm
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=None)
return client
class TestErrorHandling:
@pytest.mark.asyncio
async def test_raises_on_bad_status(
self,
downloader: ImageDownloader,
) -> None:
response_mock = MagicMock()
response_mock.headers = {}
response_mock.status_code = 404
def _raise():
raise httpx.HTTPStatusError(
"Not Found",
request=MagicMock(),
response=response_mock,
)
response_mock.raise_for_status = _raise
client_mock = TestHappyPath._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
with pytest.raises(httpx.HTTPStatusError):
await downloader.download(1, "http://example.com/404.jpg", 0)
@pytest.mark.asyncio
async def test_raises_when_content_length_too_large(
self,
downloader: ImageDownloader,
) -> None:
response_mock = AsyncMock()
response_mock.headers = {"content-length": str(60 * 1024 * 1024)}
response_mock.status_code = 200
response_mock.raise_for_status = lambda: None
response_mock.iter_bytes = AsyncMock()
client_mock = TestHappyPath._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
with pytest.raises(ImageDownloadError, match="too large"):
await downloader.download(1, "http://example.com/huge.jpg", 0)
@pytest.mark.asyncio
async def test_raises_when_stream_exceeds_max_size(
self,
downloader: ImageDownloader,
) -> None:
response_mock = AsyncMock()
response_mock.headers = {}
response_mock.status_code = 200
response_mock.raise_for_status = lambda: None
async def huge_iter():
for _ in range(60):
yield b"x" * (1024 * 1024)
response_mock.iter_bytes = huge_iter
client_mock = TestHappyPath._make_client_mock(response_mock)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
with pytest.raises(ImageDownloadError, match="exceeds max size"):
await downloader.download(1, "http://example.com/huge.jpg", 0)
class TestRetry:
@pytest.mark.asyncio
async def test_retries_on_connect_error(
self,
downloader: ImageDownloader,
) -> None:
img_bytes = TestHappyPath._make_jpeg_bytes(10, 10)
good_response = TestHappyPath._make_response_mock(
content=img_bytes, headers={"content-type": "image/jpeg"}
)
call_count = 0
@asynccontextmanager
async def flaky_stream(_method, _url, **_kwargs):
nonlocal call_count
call_count += 1
if call_count < 3:
raise httpx.ConnectError("connection refused")
yield good_response
client_mock = AsyncMock()
client_mock.stream = flaky_stream
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=None)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
result = await downloader.download(1, "http://example.com/img.jpg", 0)
assert call_count == 3
assert result.width == 10
@pytest.mark.asyncio
async def test_retries_on_timeout(
self,
downloader: ImageDownloader,
) -> None:
img_bytes = TestHappyPath._make_jpeg_bytes(10, 10)
good_response = TestHappyPath._make_response_mock(
content=img_bytes, headers={"content-type": "image/jpeg"}
)
call_count = 0
@asynccontextmanager
async def flaky_stream(_method, _url, **_kwargs):
nonlocal call_count
call_count += 1
if call_count < 2:
raise httpx.TimeoutException("timed out")
yield good_response
client_mock = AsyncMock()
client_mock.stream = flaky_stream
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=None)
with patch(
"vmk_data_collector.services.image_downloader.httpx.AsyncClient",
return_value=client_mock,
):
result = await downloader.download(1, "http://example.com/img.jpg", 0)
assert call_count == 2
assert result.width == 10
class TestExtensionDetection:
@pytest.mark.parametrize(
"content_type,url,expected",
[
("image/jpeg", "http://x/a", "jpg"),
("image/png", "http://x/a", "png"),
("image/webp", "http://x/a", "webp"),
("image/gif", "http://x/a", "gif"),
("application/octet-stream", "http://x/photo.jpg", "jpg"),
("application/octet-stream", "http://x/photo.jpeg", "jpeg"),
("application/octet-stream", "http://x/photo.png", "png"),
("application/octet-stream", "http://x/photo.webp", "webp"),
("application/octet-stream", "http://x/photo.gif", "gif"),
("application/octet-stream", "http://x/photo", "jpg"),
],
)
def test_detect_extension(
self,
content_type: str,
url: str,
expected: str,
) -> None:
assert ImageDownloader._detect_extension(content_type, url) == expected