from __future__ import annotations
import argparse
import sys
from typing import Any, Dict
from flask import Flask, jsonify, request, render_template_string
from device import RelayDevice, ButtonDevice
from state import load
app = Flask(__name__)
# Global set by create_app()
device_instance = None
# ── helpers ──────────────────────────────────────────────────
def _client_ip() -> str:
if request.environ.get("HTTP_X_FORWARDED_FOR") is None:
return request.environ["REMOTE_ADDR"]
else:
return request.environ["HTTP_X_FORWARDED_FOR"]
def _unauthorized() -> tuple:
return jsonify({
"status": "error",
"error": "Unauthorized",
"message": "Missing or invalid token",
}), 401
def _not_available() -> tuple:
return jsonify({
"status": "error",
"error": "NotAvailable",
"message": "Setup mode is not active",
}), 403
# ── about (always public) ──────────────────────────────────
@app.get("/about")
def about():
data = device_instance.about(_client_ip())
data["ip_address"] = f"{_client_ip()}:{device_instance.state.port}"
return jsonify(data)
# ── status ──────────────────────────────────────────────────
@app.get("/status")
def status():
if not device_instance._check_auth(request.headers):
return _unauthorized()
return jsonify(device_instance.status())
# ── action ──────────────────────────────────────────────────
@app.post("/action")
def action():
if not device_instance._check_auth(request.headers):
return _unauthorized()
data = request.get_json(silent=True) or {}
action_name = data.get("action", "")
params = data.get("params", {})
result = device_instance.action(action_name, params)
return jsonify(result)
# ── set_token ──────────────────────────────────────────────
@app.post("/set_token")
def set_token():
data = request.get_json(silent=True) or {}
token = data.get("token", "")
if device_instance._is_setup():
if not token:
return jsonify({"status": "error", "error": "InvalidParams", "message": "Token required"}), 400
return jsonify(device_instance.set_token(token))
# Normal mode — require auth
if not device_instance._check_auth(request.headers):
return _unauthorized()
if not token:
return jsonify({"status": "error", "error": "InvalidParams", "message": "Token required"}), 400
return jsonify(device_instance.set_token(token))
# ── reset ──────────────────────────────────────────────────
@app.post("/reset")
def reset():
if not device_instance._check_auth(request.headers):
return _unauthorized()
return jsonify(device_instance.reset())
# ── reboot (stub) ──────────────────────────────────────────
@app.post("/reboot")
def reboot():
if not device_instance._check_auth(request.headers):
return _unauthorized()
return jsonify({"status": "ok", "message": "Device will reboot now"})
# ── set_device_name ────────────────────────────────────────
@app.post("/set_device_name")
def set_device_name():
if not device_instance._check_auth(request.headers):
return _unauthorized()
data = request.get_json(silent=True) or {}
name = data.get("device_name", "")
if name:
device_instance.state.device_name = name
from state import save
save(device_instance.state)
return jsonify({"status": "ok", "message": "Device name updated"})
# ── channels schema ────────────────────────────────────────
@app.get("/channels_schema")
def get_channels_schema():
if device_instance._is_setup():
return jsonify(device_instance.get_channels_schema())
if not device_instance._check_auth(request.headers):
return _unauthorized()
return jsonify(device_instance.get_channels_schema())
@app.post("/set_channels_schema")
def set_channels_schema():
if device_instance._is_setup():
pass # allowed
elif not device_instance._check_auth(request.headers):
return _unauthorized()
data = request.get_json(silent=True) or {}
schema = data.get("schema", [])
if not isinstance(schema, list):
return jsonify({"status": "error", "error": "InvalidParams"}), 400
return jsonify(device_instance.set_channels_schema(schema))
# ── setup (stub) ───────────────────────────────────────────
@app.get("/setup")
def setup_get():
if not device_instance._is_setup():
return _not_available()
return jsonify({"status": "ok", "message": "Setup mode active"})
@app.post("/setup")
def setup_post():
if not device_instance._is_setup():
return _not_available()
return jsonify({"status": "ok", "message": "Wi-Fi configured. Connecting..."})
# ── simulate-event (debug helper, no auth) ─────────────────
@app.post("/simulate-event")
def simulate_event():
data = request.get_json(silent=True) or {}
event_name = data.get("event_name", "click")
channel = data.get("channel", 0)
if hasattr(device_instance, "trigger_click"):
result = device_instance.trigger_click(channel)
return jsonify(result)
# For relay, just flip state
if device_instance.state.device_type == "relay":
current = device_instance.state.channels[channel].get("state", "off")
new_state = "on" if current == "off" else "off"
result = device_instance.action("set_state", {"channel": channel, "state": new_state})
return jsonify(result)
return jsonify({"status": "error", "message": "Unsupported device type"}), 400
# ── web UI ─────────────────────────────────────────────────
_UI_HTML = """
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{{ alias }} — {{ device_type }}</title>
<style>
body { font-family: system-ui, -apple-system, sans-serif; margin: 2rem; background:#0f172a; color:#e2e8f0; }
.card { background:#1e293b; border-radius:12px; padding:1.5rem; margin-bottom:1rem; max-width:600px; }
h1 { margin:0 0 .5rem; font-size:1.5rem; }
.meta { color:#94a3b8; font-size:.9rem; margin-bottom:1rem; }
.channel { display:flex; align-items:center; justify-content:space-between; padding:.6rem 0; border-bottom:1px solid #334155; }
.channel:last-child { border:none; }
button { background:#3b82f6; color:#fff; border:none; padding:.5rem 1rem; border-radius:6px; cursor:pointer; }
button:hover { background:#2563eb; }
button.active { background:#22c55e; }
.status { font-family:monospace; font-size:.85rem; background:#0f172a; padding:.75rem; border-radius:6px; }
pre { margin:0; }
</style>
</head>
<body>
<div class="card">
<h1>{{ alias }} <span style="font-size:.9rem; color:#94a3b8;">({{ device_type }})</span></h1>
<div class="meta">
ID: {{ device_id }} · Port: {{ port }} · Mode: {{ status }}
{% if token %}· Token set{% endif %}
</div>
<div class="status"><pre id="about"></pre></div>
</div>
<div class="card">
<h2>Channels</h2>
<div id="channels"></div>
</div>
<div class="card">
<h2>Raw Status</h2>
<div class="status"><pre id="raw"></pre></div>
</div>
<script>
const alias = {{ alias|tojson }};
async function refresh() {
try {
const about = await fetch('/about').then(r => r.json());
document.getElementById('about').textContent = JSON.stringify(about, null, 2);
const status = await fetch('/status').then(r => r.json());
document.getElementById('raw').textContent = JSON.stringify(status, null, 2);
renderChannels(status.channels || []);
} catch (e) {
console.error(e);
}
}
function renderChannels(channels) {
const container = document.getElementById('channels');
container.innerHTML = '';
channels.forEach((ch, idx) => {
const div = document.createElement('div');
div.className = 'channel';
const state = ch.state || ch.last_event || 'idle';
const time = ch.last_event_time ? ` (${ch.last_event_time})` : '';
const isRelay = {{ device_type == 'relay'|tojson }};
if (isRelay) {
const btn = document.createElement('button');
btn.textContent = state === 'on' ? 'Turn OFF' : 'Turn ON';
if (state === 'on') btn.classList.add('active');
btn.onclick = async () => {
const newState = state === 'on' ? 'off' : 'on';
await fetch('/action', {
method: 'POST',
headers: {'Content-Type':'application/json'},
body: JSON.stringify({action:'set_state', params:{channel:idx, state:newState}})
});
refresh();
};
div.innerHTML = `<span>Channel ${idx}: <b>${state}</b></span>`;
div.appendChild(btn);
} else {
const btn = document.createElement('button');
btn.textContent = 'Click';
btn.onclick = async () => {
await fetch('/simulate-event', {
method: 'POST',
headers: {'Content-Type':'application/json'},
body: JSON.stringify({event_name:'click', channel:idx})
});
refresh();
};
div.innerHTML = `<span>Channel ${idx}: <b>${state}</b>${time}</span>`;
div.appendChild(btn);
}
container.appendChild(div);
});
}
refresh();
setInterval(refresh, 2000);
</script>
</body>
</html>
"""
@app.get("/")
def index():
return render_template_string(
_UI_HTML,
alias=device_instance.state.alias,
device_type=device_instance.state.device_type,
device_id=device_instance.state.device_id,
port=device_instance.state.port,
status=device_instance.state.status,
token=bool(device_instance.state.token),
)
# ── app factory ──────────────────────────────────────────────
def create_app(alias: str) -> Flask:
global device_instance
state = load(alias)
if state is None:
print(f"Device '{alias}' not found. Create it first with cli.py", file=sys.stderr)
sys.exit(1)
if state.device_type == "relay":
device_instance = RelayDevice(state)
elif state.device_type == "button":
device_instance = ButtonDevice(state)
else:
print(f"Unknown device type: {state.device_type}", file=sys.stderr)
sys.exit(1)
return app
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Virtual SH device emulator")
parser.add_argument("--alias", required=True, help="Device alias (must be created via cli.py)")
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
parser.add_argument("--port", type=int, default=None, help="Override port (default from device state)")
args = parser.parse_args()
flask_app = create_app(args.alias)
port = args.port or device_instance.state.port
flask_app.run(host=args.host, port=port, debug=False)