Newer
Older
smart-home-server / rest_api_debug_tool / app.py
from __future__ import annotations

import json
import re
from typing import Any, Dict, Tuple

import requests
from flask import Flask, jsonify, request, send_from_directory

app = Flask(__name__, static_folder="static")

HOP_BY_HOP_HEADERS = {
    "connection",
    "keep-alive",
    "proxy-authenticate",
    "proxy-authorization",
    "te",
    "trailer",
    "transfer-encoding",
    "upgrade",
}

PRIVATE_HOST_RE = re.compile(r"^(localhost|127\.0\.0\.1|0\.0\.0\.0)$", re.I)


def _filter_response_headers(headers: Dict[str, str]) -> Dict[str, str]:
    out: Dict[str, str] = {}
    for k, v in headers.items():
        if k.lower() in HOP_BY_HOP_HEADERS:
            continue
        out[k] = v
    return out


def _safe_target_url(url: str) -> Tuple[bool, str]:
    # Базовая защита от совсем странного ввода.
    # Для локальной разработки можно оставлять шире, но хотя бы не даём file:// и т.п.
    if not url:
        return False, "Empty url"
    if not (url.startswith("http://") or url.startswith("https://")):
        return False, "Only http/https URLs are allowed"
    return True, ""


@app.get("/")
def index():
    return send_from_directory(app.static_folder, "index.html")


@app.post("/proxy")
def proxy():
    data: Dict[str, Any] = request.get_json(silent=True) or {}

    url = str(data.get("url", "")).strip()
    method = str(data.get("method", "GET")).upper().strip()
    body_type = str(data.get("body_type", "none")).strip()
    headers_in = data.get("headers") or {}
    body_in = data.get("body", "")

    ok, err = _safe_target_url(url)
    if not ok:
        return jsonify({"error": err}), 400

    if not isinstance(headers_in, dict):
        return jsonify({"error": "headers must be an object/dict"}), 400

    # Вырезаем хоп-бай-хоп заголовки
    headers: Dict[str, str] = {}
    for k, v in headers_in.items():
        if not isinstance(k, str):
            continue
        if k.lower() in HOP_BY_HOP_HEADERS:
            continue
        headers[k] = str(v)

    timeout_s = 30

    try:
        req_kwargs: Dict[str, Any] = {
            "headers": headers,
            "timeout": timeout_s,
            "allow_redirects": False,
        }

        if method in ("GET", "HEAD"):
            # body не отправляем
            pass
        else:
            if body_type == "json":
                # body_in может быть строкой JSON или объектом
                if isinstance(body_in, (dict, list)):
                    req_kwargs["json"] = body_in
                else:
                    # строка
                    text = str(body_in)
                    if text.strip():
                        req_kwargs["data"] = text.encode("utf-8")
                    else:
                        req_kwargs["data"] = b""
                    # если Content-Type не задан — проставим
                    if not any(k.lower() == "content-type" for k in headers.keys()):
                        req_kwargs["headers"]["Content-Type"] = "application/json; charset=utf-8"

            elif body_type == "form":
                # body_in ожидаем как строку "a=1\nb=2" или dict
                if isinstance(body_in, dict):
                    req_kwargs["data"] = body_in
                else:
                    lines = [l.strip() for l in str(body_in).splitlines() if l.strip()]
                    form: Dict[str, str] = {}
                    for line in lines:
                        if "=" not in line:
                            continue
                        k, v = line.split("=", 1)
                        form[k.strip()] = v.strip()
                    req_kwargs["data"] = form
                if not any(k.lower() == "content-type" for k in headers.keys()):
                    req_kwargs["headers"]["Content-Type"] = "application/x-www-form-urlencoded; charset=utf-8"

            elif body_type == "raw":
                req_kwargs["data"] = (str(body_in)).encode("utf-8")
                if not any(k.lower() == "content-type" for k in headers.keys()):
                    req_kwargs["headers"]["Content-Type"] = "text/plain; charset=utf-8"
            else:
                # none
                pass

        resp = requests.request(method, url, **req_kwargs)

        resp_headers = _filter_response_headers(dict(resp.headers))
        content_type = resp.headers.get("content-type", "")

        # Возвращаем тело как текст (включая JSON строкой)
        # Если бинарь — лучше base64, но ты писал что в основном JSON/текст
        try:
            resp_text = resp.text
        except Exception:
            resp_text = resp.content.decode("utf-8", errors="replace")

        return jsonify(
            {
                "status": resp.status_code,
                "headers": resp_headers,
                "content_type": content_type,
                "body": resp_text,
            }
        ), 200

    except requests.RequestException as e:
        return jsonify({"error": f"Upstream request failed: {str(e)}"}), 502


if __name__ == "__main__":
    # http://127.0.0.1:8000
    app.run(host="0.0.0.0", port=8000, debug=True)