Newer
Older
navi-1 / mcp-servers / project_health / app / mcp_server.py
"""MCP server for project_health — Analyzes project structure, finds duplicates, and detects dependencies."""

from __future__ import annotations

import hashlib
import json
import os
import re
from pathlib import Path
from typing import Annotated, Any

from mcp.server.fastmcp import FastMCP
from pydantic import Field

INSTRUCTIONS = """
project_health provides tools to analyze the health and structure of a codebase.

Use it when the task involves:
- summarizing project statistics (files, lines, languages).
- finding TODO/FIXME markers or potential secrets in the codebase.
- identifying duplicate files based on content.
- detecting project dependencies from configuration files.

Workflow:
1. get_project_summary — check the overall state and potential issues.
2. find_duplicate_files — clean up redundant files.
3. get_project_dependencies — understand the project's external requirements.

ABSOLUTE RULE — NEVER bypass MCP tools:
You MUST NOT use filesystem, terminal, code_exec, or any direct file access for operations covered by this server.
""".strip()

mcp = FastMCP("project_health", instructions=INSTRUCTIONS)


def _json(data: Any) -> str:
    return json.dumps(data, ensure_ascii=False, indent=2)


# ── TOOL DEFINITIONS ────────────────────────────────────────
# ALL @mcp.tool decorators MUST be placed here, BEFORE main().

@mcp.tool(name="get_project_summary")
async def get_project_summary(
    path: Annotated[str, Field(description="Absolute path to the project root.")],
) -> str:
    """Summarize project stats, markers (TODO/FIXME), and potential secrets."""
    root = Path(path)
    if not root.is_dir():
        return _json({"error": f"Path {path} is not a directory."})

    stats = {"total_files": 0, "total_lines": 0, "languages": {}}
    markers = []
    secrets_found = []
    
    # Patterns for secrets
    secret_patterns = {
        "API Key": re.compile(r"(?i)(api[_-]?key|token|secret|password|auth)[\\s:=]+['\"][a-zA-Z0-9]{16,}[\'\"]"),
        "Generic Secret": re.compile(r"(?i)password\s*=\s*['\"][^'\"]+['\"]"),
    }

    exclude_dirs = {".git", "node_modules", "__pycache__", ".venv", "venv", ".pytest_cache", "dist", "build"}

    for dirpath, dirnames, filenames in os.walk(root):
        # Prune excluded directories
        dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

        for filename in filenames:
            file_path = Path(dirpath) / filename
            try:
                # Skip binary files or very large files for summary
                if file_path.stat().st_size > 1_000_000:  # 1MB limit for scanning
                    continue

                stats["total_files"] += 1
                
                # Determine language by extension
                ext = file_path.suffix.lower()
                if ext in ['.py']: lang = 'Python'
                elif ext in ['.js', '.ts']: lang = 'JavaScript/TypeScript'
                elif ext in ['.md']: lang = 'Markdown'
                elif ext in ['.json']: lang = 'JSON'
                elif ext in ['.toml']: lang = 'TOML'
                elif ext in ['.c', '.cpp', '.h']: lang = 'C/C++'
                else: lang = 'Other'
                
                stats["languages"][lang] = stats["languages"].get(lang, 0) + 1

                # Read content for markers and secrets
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    lines = f.readlines()
                    stats["total_lines"] += len(lines)
                    
                    for i, line in enumerate(lines, 1):
                        # Check for TODO/FIXME
                        if "TODO" in line or "FIXME" in line:
                            markers.append({
                                "file": str(file_path.relative_to(root)),
                                "line": i,
                                "content": line.strip()
                            })
                        
                        # Check for secrets
                        for name, pattern in secret_patterns.items():
                            if pattern.search(line):
                                secrets_found.append({
                                    "file": str(file_path.relative_to(root)),
                                    "line": i,
                                    "type": name
                                })
            except Exception:
                continue

    return _json({
        "file_stats": stats,
        "markers": markers,
        "secrets_found": secrets_found
    })


@mcp.tool(name="find_duplicate_files")
async def find_duplicate_files(
    path: Annotated[str, Field(description="Absolute path to the project root.")],
) -> str:
    """Find files with identical content using SHA256 hashing."""
    root = Path(path)
    if not root.is_dir():
        return _json({"error": f"Path {path} is not a directory."})

    hashes = {}  # hash -> [list of paths]
    exclude_dirs = {".git", "node_modules", "__pycache__", ".venv", "venv"}

    for dirpath, dirnames, filenames in os.walk(root):
        dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

        for filename in filenames:
            file_path = Path(dirpath) / filename
            try:
                # Only hash files up to 5MB to avoid performance issues
                if file_path.stat().st_size > 5_000_000:
                    continue

                hasher = hashlib.sha256()
                with open(file_path, 'rb') as f:
                    while chunk := f.read(8192):
                        hasher.update(chunk)
                
                file_hash = hasher.hexdigest()
                rel_path = str(file_path.relative_to(root))
                
                if file_hash in hashes:
                    hashes[file_hash].append(rel_path)
                else:
                    hashes[file_hash] = [rel_path]
            except Exception:
                continue

    # Filter only groups that have more than one file
    duplicates = [paths for paths in hashes.values() if len(paths) > 1]
    return _json({"duplicate_groups": duplicates})


@mcp.tool(name="get_project_dependencies")
async def get_project_dependencies(
    path: Annotated[str, Field(description="Absolute path to the project root.")],
) -> str:
    """Identify dependencies by parsing common configuration files."""
    root = Path(path)
    if not root.is_dir():
        return _json({"error": f"Path {path} is not a directory."})

    dependencies = {
        "python": [],
        "javascript": [],
        "other": []
    }

    # Check pyproject.toml
    pyproject = root / "pyproject.toml"
    if pyproject.exists():
        try:
            content = pyproject.read_text(encoding='utf-8')
            # Simple regex to find dependencies in pyproject.toml
            deps = re.findall(r'dependencies\s*=\s*\[(.*?)\]', content, re.DOTALL)
            if deps:
                # Clean up the matches
                dep_list = [d.strip().strip('"').strip("'") for d in re.split(r',', deps[0])]
                dependencies["python"].extend([d for d in dep_list if d])
        except Exception:
            pass

    # Check requirements.txt
    req_txt = root / "requirements.txt"
    if req_txt.exists():
        try:
            content = req_txt.read_text(encoding='utf-8')
            deps = [line.strip() for line in content.splitlines() if line.strip() and not line.startswith("#")]
            dependencies["python"].extend(deps)
        except Exception:
            pass

    # Check package.json
    package_json = root / "package.json"
    if package_json.exists():
        try:
            data = json.loads(package_json.read_text(encoding='utf-8')) # Note: error handling needed
            # This is a simplified parser
            deps = data.get("dependencies", {})
            dev_deps = data.get("devDependencies", {})
            dependencies["javascript"].extend(list(deps.keys()) + list(dev_deps.keys()))
        except Exception:
            # Fallback to simple regex if JSON is messy or encoding fails
            try:
                content = package_json.read_text(encoding='utf-8')
                deps = re.findall(r'"([^"]+)":\s*"[^"]*"', content)
                dependencies["javascript"].extend(deps)
            except Exception:
                pass

    return _json(dependencies)




# ── MAIN / TRANSPORT ──────────────────────────────────────────────────

def main() -> None:
    transport = os.environ.get("MCP_TRANSPORT", "stdio")
    mcp.run(transport=transport)


if __name__ == "__main__":
    main()