Newer
Older
vmk-360_data_mcp / src / vmk_data_mcp / main.py
"""Точка входа MCP-сервера с HTTP-транспортом (streamable-http)."""

import json
import logging
from contextlib import asynccontextmanager

import asyncpg
import httpx
from mcp.server import FastMCP

from vmk_data_mcp.config import settings
from vmk_data_mcp.db import close_pool, init_pool
from vmk_data_mcp.embedder import close_client
from vmk_data_mcp.models import (
    GetListingInput,
    SearchMetadataInput,
    SearchSimilarInput,
)
from vmk_data_mcp.tools import (
    describe_schema,
    get_listing_by_id,
    search_by_metadata,
    search_similar_listings,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _error_json(message: str) -> str:
    """Возвращает стандартизированный JSON с ошибкой."""
    return json.dumps({"error": message}, indent=2, ensure_ascii=False)


@asynccontextmanager
async def app_lifespan(server):
    """Жизненный цикл приложения — инициализация и закрытие ресурсов."""
    logger.info("Initializing DB pool ...")
    await init_pool()
    logger.info("DB pool ready.")
    yield
    logger.info("Shutting down ...")
    await close_pool()
    await close_client()
    logger.info("Shutdown complete.")


mcp = FastMCP(
    settings.mcp_server_name,
    host="0.0.0.0",
    port=settings.mcp_port,
    lifespan=app_lifespan,
)

# ── Регистрация инструментов ──────────────────────────────────────────


@mcp.tool()
async def search_similar_listings_tool(
    query: str,
    deal_type: str | None = None,
    city: str | None = None,
    district: str | None = None,
    rooms_count: int | None = None,
    min_price: float | None = None,
    max_price: float | None = None,
    currency: str | None = None,
    min_total_area: float | None = None,
    max_total_area: float | None = None,
    building_type: str | None = None,
    floor: int | None = None,
    listing_status: str | None = None,
    metro_station: str | None = None,
    limit: int = 20,
    offset: int = 0,
    min_similarity: float = 0.7,
) -> str:
    """🔍 Векторный (семантический) поиск объявлений.

    Находит объявления по **смыслу** запроса, используя pgvector + HNSW.
    Поддерживает фильтры по метаданным (цена, район, комнаты и т.д.).

    **Важно:** запрос `query` должен быть на **украинском** языке.
    """
    try:
        args = SearchSimilarInput(
            query=query,
            filters={
                "deal_type": deal_type,
                "city": city,
                "district": district,
                "rooms_count": rooms_count,
                "min_price": min_price,
                "max_price": max_price,
                "currency": currency,
                "min_total_area": min_total_area,
                "max_total_area": max_total_area,
                "building_type": building_type,
                "floor": floor,
                "listing_status": listing_status,
                "metro_station": metro_station,
            },
            pagination={"limit": limit, "offset": offset},
            min_similarity=min_similarity,
        )
        return await search_similar_listings(args)
    except httpx.HTTPError as e:
        logger.warning("Ollama error: %s", e)
        return _error_json(f"Сервис эмбеддингов недоступен: {e}")
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при поиске: {e}")
    except Exception as e:
        logger.exception("Unexpected error in search_similar_listings")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def search_by_metadata_tool(
    query: str,
    deal_type: str | None = None,
    city: str | None = None,
    district: str | None = None,
    rooms_count: int | None = None,
    min_price: float | None = None,
    max_price: float | None = None,
    currency: str | None = None,
    min_total_area: float | None = None,
    max_total_area: float | None = None,
    building_type: str | None = None,
    floor: int | None = None,
    listing_status: str | None = None,
    metro_station: str | None = None,
    limit: int = 20,
    offset: int = 0,
) -> str:
    """📋 Полнотекстовый поиск + фильтры по метаданным.

    Ищет по словам в заголовке, описании, городе, районе, метро
    через готовую FTS-колонку `search_vector` (украинский конфиг) + GIN-индекс.
    Поддерживает те же фильтры, что и `search_similar_listings`.

    **Важно:** запрос `query` должен быть на **украинском** языке.
    """
    try:
        args = SearchMetadataInput(
            query=query,
            filters={
                "deal_type": deal_type,
                "city": city,
                "district": district,
                "rooms_count": rooms_count,
                "min_price": min_price,
                "max_price": max_price,
                "currency": currency,
                "min_total_area": min_total_area,
                "max_total_area": max_total_area,
                "building_type": building_type,
                "floor": floor,
                "listing_status": listing_status,
                "metro_station": metro_station,
            },
            pagination={"limit": limit, "offset": offset},
        )
        return await search_by_metadata(args)
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при поиске: {e}")
    except Exception as e:
        logger.exception("Unexpected error in search_by_metadata")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def get_listing_by_id_tool(listing_id: int) -> str:
    """📄 Получить полную карточку объявления по ID.

    Возвращает все пользовательские поля объявления.
    """
    try:
        args = GetListingInput(listing_id=listing_id)
        return await get_listing_by_id(args)
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при получении объявления: {e}")
    except Exception as e:
        logger.exception("Unexpected error in get_listing_by_id")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def describe_schema_tool() -> str:
    """ℹ️ Описание схемы данных.

    Возвращает описание таблицы `property_listings`:
    поля, типы, enum-значения, особенности поиска (vector + FTS).
    """
    try:
        return await describe_schema()
    except Exception as e:
        logger.exception("Unexpected error in describe_schema")
        return _error_json(f"Неожиданная ошибка: {e}")


if __name__ == "__main__":
    mcp.run(transport="streamable-http")