Newer
Older
vmk-360_data_mcp / src / vmk_data_mcp / main.py
"""Точка входа MCP-сервера с HTTP-транспортом (streamable-http).

VMK Data MCP Server предоставляет AI-агентам безопасный read-only доступ
к базе данных недвижимости vmk_data (PostgreSQL + pgvector).

Данные: ~35 полей объявлений (цена, площадь, район, метро, тип сделки, ...).
Поиск: семантический (vector 768d через Ollama) + полнотекстовый (FTS).
Язык: украинский (запросы переводятся агентом перед вызовом).
Зачем: AI-агент может искать квартиры/дома по описанию пользователя,
фильтровать по бюджету и району, показывать детали объявлений.
"""

import json
import logging
from contextlib import asynccontextmanager

import asyncpg
import httpx
from mcp.server import FastMCP

from vmk_data_mcp.config import settings
from vmk_data_mcp.db import close_pool, init_pool
from vmk_data_mcp.embedder import close_client
from vmk_data_mcp.models import (
    GetListingInput,
    SearchMetadataInput,
    SearchSimilarInput,
)
from vmk_data_mcp.tools import (
    describe_schema,
    get_listing_by_id,
    search_by_metadata,
    search_similar_listings,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _error_json(message: str) -> str:
    """Возвращает стандартизированный JSON с ошибкой."""
    return json.dumps({"error": message}, indent=2, ensure_ascii=False)


@asynccontextmanager
async def app_lifespan(server):
    """Жизненный цикл приложения — инициализация и закрытие ресурсов."""
    logger.info("Initializing DB pool ...")
    await init_pool()
    logger.info("DB pool ready.")
    yield
    logger.info("Shutting down ...")
    await close_pool()
    await close_client()
    logger.info("Shutdown complete.")


mcp = FastMCP(
    settings.mcp_server_name,
    host="0.0.0.0",
    port=settings.mcp_port,
    lifespan=app_lifespan,
)

# ── Prompts (гайды для AI-агента) ───────────────────────────────────


@mcp.prompt()
def search_guide() -> str:
    """📘 Гайд: как эффективно искать объявления через VMK Data MCP."""
    return """\
# Гайд по поиску объявлений (VMK Data MCP)

## Два инструмента поиска — выбирай правильно

### 1. search_similar_listings — векторный (семантический) поиск
Используй, когда пользователь описывает **желания, атмосферу, качества**:
- ✅ "уютная квартира с ремонтом у метро"
- ✅ "светлая студия в центрі міста"
- ✅ "простора 3-кімнатна з балконом"

**Как формулировать query (украинский):**
- Конкретно, без разговорных стоп-слов.
- ✓ Хорошо: "2-кімнатна квартира біля метро з ремонтом"
- ✗ Плохо:  "дай квартиру недорого" (стоп-слова: дай, недорого)

**Фильтры:**
- Обязательно указывай `currency` при `min_price`/`max_price`.
- `city` и `district` — поиск по подстроке, регистр не важен.
- `rooms_count`: 0 = студия.

### 2. search_by_metadata — полнотекстовый поиск (FTS)
Используй, когда пользователь называет **конкретные ключевые слова**:
- ✅ "Печерський район метро Арсенальна"
- ✅ "вул. Шевченка Львів оренда"
- ✅ "новобудова моноліт Солом'янський"

**Как формулировать query (украинский):**
- Используй имена собственные и термины.
- ✓ Хорошо: "Печерський район Арсенальна метро"
- ✗ Плохо:  "хочу квартиру" (слишком общее — используй vector)

## Fallback-стратегия при 0 результатов

1. Попробуй другой инструмент (vector → FTS или FTS → vector).
2. Убери 1–2 жёстких фильтра (обычно `district` или `metro_station`).
3. Упрости query — убери разговорные слова.
4. Проверь, что query на украинском.

## Пагинация

- `total` > `limit` → есть следующая страница.
- Следующая страница: `offset += limit`.
- Предыдущая страница: `offset -= limit` (но не ниже 0).
- `limit` от 1 до 100.

## Фильтры — лучшие комбинации

- `city` + `deal_type` + `rooms_count` + `currency`
- `city` + `district` + `deal_type` + `min_price` + `max_price` + `currency`
- `city` + `metro_station` + `rooms_count` + `listing_status=active`

**Избегай:**
- `min_price`/`max_price` без `currency` — фильтрует по всем валютам.
- Более 5 фильтров одновременно — часто даёт 0 результатов.
"""


# ── Регистрация инструментов ────────────────────────────────────────


@mcp.tool()
async def search_similar_listings_tool(
    query: str,
    deal_type: str | None = None,
    city: str | None = None,
    district: str | None = None,
    rooms_count: int | None = None,
    min_price: float | None = None,
    max_price: float | None = None,
    currency: str | None = None,
    min_total_area: float | None = None,
    max_total_area: float | None = None,
    building_type: str | None = None,
    floor: int | None = None,
    listing_status: str | None = None,
    metro_station: str | None = None,
    limit: int = 20,
    offset: int = 0,
) -> str:
    """🔍 Векторный (семантический) поиск объявлений по смыслу.

    Используй, когда пользователь описывает **желания, атмосферу, качества**
    («уютная квартира с ремонтом», «светлая студия у метро»).
    Не используй для точных ключевых слов (район, метро) — для этого
    используй `search_by_metadata`.

    **Важно:** запрос `query` должен быть на **украинском** языке.
    Формулируй конкретно, без разговорных стоп-слов.

    **Фильтры:** обязательно указывай `currency` при `min_price`/`max_price`.
    `city` и `district` — поиск по подстроке (ILIKE), регистр не важен.
    `rooms_count`: 0 = студия.

    **Fallback при 0 результатов:** сервер вернёт подсказку с рекомендациями
    (попробовать FTS, убрать фильтры, упростить query).
    """
    try:
        args = SearchSimilarInput(
            query=query,
            filters={
                "deal_type": deal_type,
                "city": city,
                "district": district,
                "rooms_count": rooms_count,
                "min_price": min_price,
                "max_price": max_price,
                "currency": currency,
                "min_total_area": min_total_area,
                "max_total_area": max_total_area,
                "building_type": building_type,
                "floor": floor,
                "listing_status": listing_status,
                "metro_station": metro_station,
            },
            pagination={"limit": limit, "offset": offset},
        )
        return await search_similar_listings(args)
    except httpx.HTTPError as e:
        logger.warning("Ollama error: %s", e)
        return _error_json(f"Сервис эмбеддингов недоступен: {e}")
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при поиске: {e}")
    except Exception as e:
        logger.exception("Unexpected error in search_similar_listings")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def search_by_metadata_tool(
    query: str,
    deal_type: str | None = None,
    city: str | None = None,
    district: str | None = None,
    rooms_count: int | None = None,
    min_price: float | None = None,
    max_price: float | None = None,
    currency: str | None = None,
    min_total_area: float | None = None,
    max_total_area: float | None = None,
    building_type: str | None = None,
    floor: int | None = None,
    listing_status: str | None = None,
    metro_station: str | None = None,
    limit: int = 20,
    offset: int = 0,
) -> str:
    """📋 Полнотекстовый поиск + фильтры по метаданным.

    Используй, когда пользователь называет **конкретные ключевые слова**
    («Печерський район», «Арсенальна метро», «вул. Шевченка»).
    Работает через готовую FTS-колонку `search_vector` (украинский конфиг) + GIN-индекс.

    **Важно:** запрос `query` должен быть на **украинском** языке.
    Используй имена собственные и термины, а не разговорные описания.

    **Фильтры:** те же, что и для `search_similar_listings`.
    Обязательно указывай `currency` при `min_price`/`max_price`.

    **Fallback при 0 результатов:** сервер вернёт подсказку с рекомендациями
    (попробовать vector search, убрать фильтры).
    """
    try:
        args = SearchMetadataInput(
            query=query,
            filters={
                "deal_type": deal_type,
                "city": city,
                "district": district,
                "rooms_count": rooms_count,
                "min_price": min_price,
                "max_price": max_price,
                "currency": currency,
                "min_total_area": min_total_area,
                "max_total_area": max_total_area,
                "building_type": building_type,
                "floor": floor,
                "listing_status": listing_status,
                "metro_station": metro_station,
            },
            pagination={"limit": limit, "offset": offset},
        )
        return await search_by_metadata(args)
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при поиске: {e}")
    except Exception as e:
        logger.exception("Unexpected error in search_by_metadata")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def get_listing_by_id_tool(listing_id: int) -> str:
    """📄 Получить полную карточку объявления по ID.

    Возвращает все пользовательские поля объявления.
    Если объявление не найдено — вернёт ошибку с подсказкой
    (проверить ID, возможен статус archived/removed).
    """
    try:
        args = GetListingInput(listing_id=listing_id)
        return await get_listing_by_id(args)
    except (asyncpg.PostgresError, ValueError) as e:
        logger.warning("DB/validation error: %s", e)
        return _error_json(f"Ошибка при получении объявления: {e}")
    except Exception as e:
        logger.exception("Unexpected error in get_listing_by_id")
        return _error_json(f"Неожиданная ошибка: {e}")


@mcp.tool()
async def describe_schema_tool() -> str:
    """ℹ️ Описание схемы данных.

    Возвращает описание таблицы `property_listings` с гайдами по использованию:
    когда использовать vector/FTS, лучшие комбинации фильтров,
    примеры запросов, стратегия fallback и пагинации.
    """
    try:
        return await describe_schema()
    except Exception as e:
        logger.exception("Unexpected error in describe_schema")
        return _error_json(f"Неожиданная ошибка: {e}")


if __name__ == "__main__":
    mcp.run(transport="streamable-http")