Newer
Older
vmk-360-data_collector / src / vmk_data_collector / services / image_downloader.py
import hashlib
import time
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path

import httpx
import structlog
from PIL import Image
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)

from vmk_data_collector.core.exceptions import ImageDownloadError
from vmk_data_collector.core.metrics import image_download_duration_seconds
from vmk_data_collector.core.security import validate_url

_MAX_IMAGE_BYTES = 50 * 1024 * 1024

logger = structlog.get_logger()


@dataclass
class PropertyImageDownloadResult:
    local_path: str
    image_hash: str
    width: int
    height: int
    file_size: int


_IMAGE_RETRY = {
    "stop": stop_after_attempt(3),
    "wait": wait_exponential(min=1, max=10),
    "retry": retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
    "before_sleep": lambda retry_state: logger.warning(
        "image_download_retry", attempt=retry_state.attempt_number
    ),
    "reraise": True,
}


class ImageDownloader:
    def __init__(self, storage_path: Path) -> None:
        self._storage_path = storage_path

    @retry(**_IMAGE_RETRY)
    async def download(
        self,
        property_id: int,
        image_url: str,
        order_index: int,
    ) -> PropertyImageDownloadResult:
        start = time.perf_counter()
        logger.info(
            "image_download_start",
            property_id=property_id,
            url=image_url,
            order=order_index,
        )

        try:
            validate_url(image_url)
            headers = {
                "User-Agent": (
                    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                    "AppleWebKit/537.36 (KHTML, like Gecko) "
                    "Chrome/125.0.0.0 Safari/537.36"
                ),
                "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
                "Referer": "https://dom.ria.com/",
            }
            async with httpx.AsyncClient(timeout=30, headers=headers) as client, client.stream(
                "GET", image_url
            ) as response:
                response.raise_for_status()
                content_length = response.headers.get("content-length")
                if content_length and int(content_length) > _MAX_IMAGE_BYTES:
                    raise ImageDownloadError(
                        f"Image too large: {content_length} bytes"
                    )
                content = bytearray()
                async for chunk in response.iter_bytes():
                    content.extend(chunk)
                    if len(content) > _MAX_IMAGE_BYTES:
                        raise ImageDownloadError(
                            f"Image exceeds max size of {_MAX_IMAGE_BYTES} bytes"
                        )
            content = bytes(content)

            image_hash = hashlib.sha256(content).hexdigest()
            ext = self._detect_extension(
                response.headers.get("content-type", ""), image_url
            )

            with Image.open(BytesIO(content)) as img:
                width, height = img.size

            property_dir = self._storage_path / str(property_id)
            property_dir.mkdir(parents=True, exist_ok=True)

            local_path = property_dir / f"{image_hash}.{ext}"
            local_path.write_bytes(content)

            file_size = len(content)

            logger.info(
                "image_download_complete",
                property_id=property_id,
                hash=image_hash,
                width=width,
                height=height,
                size=file_size,
            )

            return PropertyImageDownloadResult(
                local_path=str(local_path),
                image_hash=image_hash,
                width=width,
                height=height,
                file_size=file_size,
            )
        finally:
            image_download_duration_seconds.observe(time.perf_counter() - start)

    async def process_local_file(
        self,
        property_id: int,
        temp_path: str,
        order_index: int,
        content_type: str = "",
    ) -> PropertyImageDownloadResult:
        start = time.perf_counter()
        logger.info(
            "image_process_local_start",
            property_id=property_id,
            temp_path=temp_path,
            order=order_index,
        )

        try:
            source = Path(temp_path)
            content = source.read_bytes()
            if len(content) > _MAX_IMAGE_BYTES:
                raise ImageDownloadError(
                    f"Image exceeds max size of {_MAX_IMAGE_BYTES} bytes"
                )

            image_hash = hashlib.sha256(content).hexdigest()
            ext = self._detect_extension(content_type, temp_path)

            with Image.open(BytesIO(content)) as img:
                width, height = img.size

            property_dir = self._storage_path / str(property_id)
            property_dir.mkdir(parents=True, exist_ok=True)

            local_path = property_dir / f"{image_hash}.{ext}"
            local_path.write_bytes(content)

            file_size = len(content)

            logger.info(
                "image_process_local_complete",
                property_id=property_id,
                hash=image_hash,
                width=width,
                height=height,
                size=file_size,
            )

            return PropertyImageDownloadResult(
                local_path=str(local_path),
                image_hash=image_hash,
                width=width,
                height=height,
                file_size=file_size,
            )
        finally:
            image_download_duration_seconds.observe(time.perf_counter() - start)

    @staticmethod
    def _detect_extension(content_type: str, url_or_path: str) -> str:
        ct = content_type.lower()
        if "jpeg" in ct or "jpg" in ct:
            return "jpg"
        if "png" in ct:
            return "png"
        if "webp" in ct:
            return "webp"
        if "gif" in ct:
            return "gif"

        from urllib.parse import urlparse

        path = urlparse(url_or_path).path.lower()
        for ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
            if path.endswith(ext):
                return ext.lstrip(".")
        return "jpg"