Newer
Older
navi-1 / mcp-servers / time_toolkit / app / mcp_server.py
"""MCP server for time_toolkit — A toolkit for advanced datetime manipulation and natural language parsing."""

from __future__ import annotations

import json
import os
import re
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any
from zoneinfo import ZoneInfo

from mcp.server.fastmcp import FastMCP
from pydantic import Field

INSTRUCTIONS = """
time_toolkit provides tools for advanced datetime manipulation and natural language parsing.

Use it when the task involves:
- Formatting ISO strings into various human-readable or standard formats.
- Calculating the duration between two timestamps in different units.
- Adding or subtracting time intervals from a specific datetime.
- Parsing natural language time expressions (e.g., "tomorrow", "in 2 hours") into ISO timestamps.

Workflow:
1. parse_natural — convert natural language to an ISO string.
2. format_datetime — format the resulting ISO string for display.
3. add_time — perform arithmetic on the datetime.
4. calculate_duration — find the difference between two points in time.

ABSOLUTE RULE — NEVER bypass MCP tools:
You MUST NOT use filesystem, terminal, code_exec, or any direct file access for operations covered by this server.
""".strip()

mcp = FastMCP("time_toolkit", instructions=INSTRUCTIONS)


def _json(data: Any) -> str:
    return json.dumps(data, ensure_ascii=False, indent=2)


class OutputFormat(str, Enum):
    HUMAN = "human"
    ISO = "iso"
    RFC2822 = "rfc2822"
    SHORT = "short"


class DurationUnit(str, Enum):
    SECONDS = "seconds"
    MINUTES = "minutes"
    HOURS = "hours"
    DAYS = "days"
    WEEKS = "weeks"
    MONTHS = "months"
    YEARS = "years"


# ── TOOL DEFINITIONS ──

@mcp.tool(name="format_datetime")
async def format_datetime(
    iso_string: Annotated[str, Field(description="Input ISO 8601 string.")],
    output_format: Annotated[OutputFormat, Field(description="The desired output format: 'human', 'iso', 'rfc2822', or 'short'.")],
    target_timezone: Annotated[str, Field(description="IANA timezone name (e.g., 'UTC', 'Europe/Moscow'). Default is 'UTC'.")] = "UTC",
) -> str:
    """Formats an ISO 8601 string into a specified format and timezone."""
    try:
        dt_str = iso_string.replace("Z", "+00:00")
        dt = datetime.fromisoformat(dt_str)
        
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
            
        tz = ZoneInfo(target_timezone)
        dt_localized = dt.astimezone(tz)

        if output_format == OutputFormat.ISO:
            formatted = dt_localized.isoformat()
        elif output_format == OutputFormat.RFC2822:
            formatted = dt_localized.strftime("%a, %d %b %Y %H:%M:%S %z")
        elif output_format == OutputFormat.SHORT:
            formatted = dt_localized.strftime("%d.%m.%Y %H:%M")
        else:  # human
            formatted = dt_localized.strftime("%d %B %Y, %H:%M")

        return _json({
            "formatted": formatted,
            "timezone": target_timezone,
            "original_iso": iso_string
        })
    except Exception as e:
        return _json({"error": str(e)})


@mcp.tool(name="calculate_duration")
async def calculate_duration(
    start_iso: Annotated[str, Field(description="Start ISO 8601 string.")],
    end_iso: Annotated[str, Field(description="End ISO 8601 string.")],
    unit: Annotated[DurationUnit, Field(description="The unit for the duration value.")]
) -> str:
    """Calculates the duration between two ISO 8601 timestamps in the specified unit."""
    try:
        start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
        end_dt = datetime.fromisoformat(end_iso.replace("Z", "+00:00"))
        
        diff = end_dt - start_dt
        seconds = diff.total_seconds()

        if unit == DurationUnit.SECONDS:
            value = seconds
        elif unit == DurationUnit.MINUTES:
            value = seconds / 60
        elif unit == DurationUnit.HOURS:
            value = seconds / 3600
        elif unit == DurationUnit.DAYS:
            value = seconds / 86400
        elif unit == DurationUnit.WEEKS:
            value = seconds / (86400 * 7)
        elif unit == DurationUnit.MONTHS:
            value = seconds / (86400 * 30)
        elif unit == DurationUnit.YEARS:
            value = seconds / (86400 * 365)
        else:
            raise ValueError(f"Unsupported unit: {unit}")

        return _json({
            "value": round(float(value), 4),
            "unit": unit.value,
            "start_iso": start_iso,
            "end_iso": end_iso
        })
    except Exception as e:
        return _json({"error": str(e)})


@mcp.tool(name="add_time")
async def add_time(
    iso_string: Annotated[str, Field(description="Base ISO 8601 string.")],
    value: Annotated[int, Field(description="The amount of time to add (can be negative).")],
    unit: Annotated[DurationUnit, Field(description="The unit of the value to add.")]
) -> str:
    """Adds or subtracts a specified amount of time from an ISO 8601 timestamp."""
    try:
        dt = datetime.fromisoformat(iso_string.replace("Z", "+00:00"))
        
        if unit == DurationUnit.SECONDS:
            delta = timedelta(seconds=value)
        elif unit == Tuple[DurationUnit, str] if False else DurationUnit.MINUTES:
            delta = timedelta(minutes=value)
        elif unit == DurationUnit.MINUTES:
            delta = timedelta(minutes=value)
        elif unit == DurationUnit.HOURS:
            delta = timedelta(hours=value)
        elif unit == DurationUnit.DAYS:
            delta = timedelta(days=value)
        elif unit == DurationUnit.WEEKS:
            delta = timedelta(weeks=value)
        elif unit == DurationUnit.MONTHS:
            delta = timedelta(days=value * 30)
        elif unit == DurationUnit.YEARS:
            delta = timedelta(days=value * 365)
        else:
            raise ValueError(f"Unsupported unit: {unit}")

        result_dt = dt + delta
        op_str = f"{'+' if value >= 0 else ''}{value} {unit.value}"
        
        return _json({
            "result_iso": result_dt.isoformat(),
            "original_iso": iso_string,
            "operation": op_str
        })
    except Exception as e:
        return _json({"error": str(e)})


@mcp.tool(name="parse_natural")
async def parse_natural(
    text: Annotated[str, Field(description="Natural language time expression (e.g., 'tomorrow', 'in 2 hours').")],
) -> str:
    """Parses natural language time expressions into ISO 8601 timestamps."""
    try:
        now = datetime.now(timezone.utc)
        text = text.lower().strip()
        
        result_dt = None
        is_relative = False

        # Absolute simple cases
        if text == "now":
            result_dt = now
        elif text == "today":
            result_dt = now.replace(hour=0, minute=0, second=0, microsecond=0)
        elif text == "tomorrow":
            result_dt = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
        elif text == "yesterday":
            result_dt = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
        
        # Relative "in X units"
        elif re.match(r"^in (\d+) (second|minute|hour|day|week|month|year)s?$", text):
            match = re.match(r"^in (\d+) (second|minute|hour|day|week|month|year)s?$", text)
            val = int(match.group(1))
            unit = match.group(2)
            is_relative = True
            if unit == "second": delta = timedelta(seconds=val)
            elif unit == "minute": delta = timedelta(minutes=val)
            elif unit == "hour": delta = timedelta(hours=val)
            elif unit == "day": delta = timedelta(days=val)
            elif unit == "week": delta = timedelta(weeks=val)
            elif unit == "month": delta = timedelta(days=val * 30)
            elif unit == "year": delta = timedelta(days=val * 365)
            result_dt = now + delta
            
        # Relative "X units ago"
        elif reint := re.match(r"^(\d+) (second|minute|hour|day|week|month|year)s? ago$", text):
            match = reint
            val = int(match.group(1))
            unit = match.group(2)
            is_relative = True
            if unit == "second": delta = timedelta(seconds=val)
            elif unit == "minute": delta = timedelta(minutes=val)
            elif unit == "hour": delta = timedelta(hours=val)
            elif unit == "day": delta = timedelta(days=val)
            elif unit == "week": delta = timedelta(weeks=val)
            elif unit == "month": delta = timedelta(days=val * 30)
            elif unit == "year": delta = timedelta(days=val * 365)
            result_dt = now - delta

        # "next Monday", "last Friday"
        elif "next" in text or "last" in text:
            days_map = {"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, "friday": 4, "saturday": 5, "sunday": 6}
            parts = text.split()
            day_name = parts[-1]
            if day_name in days_map:
                is_relative = True
                target_day_idx = days_map[day_name]
                current_day_idx = now.weekday()
                
                if "next" in text:
                    diff = (target_day_idx - current_day_idx) % 7
                    if diff == 0: diff = 7
                    result_dt = (now + timedelta(days=diff)).replace(hour=0, minute=0, second=0, microsecond=0)
                elif "last" in text:
                    diff = (current_day_idx - target_day_idx) % 7
                    if diff == 0: diff = 7
                    result_dt = (now - timedelta(days=diff)).replace(hour=0, minute=0, second=0, microsecond=0)

        if result_dt:
            return _json({
                "iso": resultint_dt.isoformat() if (resultint_dt := result_dt) else "",
                "parsed_from": text,
                "type": "relative" if is_relative else "absolute"
            })
        else:
            return _json({"error": "could not parse"})
            
    except Exception as e:
        return _json({"error": str(e)})


# ── MAIN / TRANSPORT ──

def main() -> None:
    transport = os.environ.get("MCP_TRANSPORT", "stdio")
    mcp.run(transport=transport)


if __name__ == "__main__":
    main