Files
pi-kit/pikit_api/http_handlers.py

307 lines
13 KiB
Python

import json
import urllib.parse
from http.server import BaseHTTPRequestHandler
from .auto_updates import auto_updates_state, read_updates_config, set_auto_updates, set_updates_config
from .constants import CORE_NAME, CORE_PORTS, DIAG_LOG_FILE
from .diagnostics import _load_diag_state, _save_diag_state, dbg, diag_log, diag_read
from .helpers import default_host, detect_https, normalize_path
from .releases import (
check_for_update,
fetch_manifest,
fetch_text_with_auth,
load_update_state,
read_current_version,
save_update_state,
start_background_task,
list_available_releases,
apply_update_version,
)
from .services import (
FirewallToolMissing,
allow_port_lan,
factory_reset,
load_services,
remove_port_lan,
save_services,
ufw_status_allows,
)
from .status import collect_status, list_services_for_ui
class Handler(BaseHTTPRequestHandler):
"""JSON API for the dashboard (status, services, updates, reset)."""
def _send(self, code, data):
body = json.dumps(data).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
return
# GET endpoints
def do_GET(self):
if self.path.startswith("/api/status"):
return self._send(200, collect_status())
if self.path.startswith("/api/services"):
return self._send(200, {"services": list_services_for_ui()})
if self.path.startswith("/api/updates/auto"):
state = auto_updates_state()
return self._send(200, {"enabled": state.get("enabled", False), "details": state})
if self.path.startswith("/api/updates/config"):
return self._send(200, read_updates_config())
if self.path.startswith("/api/update/status"):
state = load_update_state()
state["current_version"] = read_current_version()
state["channel"] = state.get("channel", "dev")
return self._send(200, state)
if self.path.startswith("/api/update/changelog"):
try:
qs = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(qs)
url = params.get("url", [None])[0]
if not url:
manifest = fetch_manifest()
url = manifest.get("changelog")
if not url:
return self._send(404, {"error": "no changelog url"})
text = fetch_text_with_auth(url)
return self._send(200, {"text": text})
except Exception as e:
return self._send(500, {"error": str(e)})
if self.path.startswith("/api/diag/log"):
entries = diag_read()
state = _load_diag_state()
return self._send(200, {"entries": entries, "state": state})
if self.path.startswith("/api/update/releases"):
state = load_update_state()
channel = state.get("channel") or "stable"
return self._send(200, {"releases": list_available_releases(channel)})
return self._send(404, {"error": "not found"})
# POST endpoints
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
payload = json.loads(self.rfile.read(length) or "{}")
if self.path.startswith("/api/reset"):
if payload.get("confirm") == "YES":
self._send(200, {"message": "Resetting and rebooting..."})
dbg("Factory reset triggered via API")
diag_log("info", "Factory reset requested")
factory_reset()
else:
self._send(400, {"error": "type YES to confirm"})
return
if self.path.startswith("/api/updates/auto"):
enable = bool(payload.get("enable"))
set_auto_updates(enable)
dbg(f"Auto updates set to {enable}")
state = auto_updates_state()
diag_log("info", "Auto updates toggled", {"enabled": enable})
return self._send(200, {"enabled": state.get("enabled", False), "details": state})
if self.path.startswith("/api/updates/config"):
try:
cfg = set_updates_config(payload or {})
dbg(f"Update settings applied: {cfg}")
diag_log("info", "Update settings saved", cfg)
return self._send(200, cfg)
except Exception as e:
dbg(f"Failed to apply updates config: {e}")
diag_log("error", "Update settings save failed", {"error": str(e)})
return self._send(500, {"error": str(e)})
if self.path.startswith("/api/update/check"):
state = check_for_update()
return self._send(200, state)
if self.path.startswith("/api/update/apply_version"):
version = payload.get("version")
if not version:
return self._send(400, {"error": "version required"})
state = load_update_state()
chan = payload.get("channel") or state.get("channel") or "stable"
result = apply_update_version(version, chan)
return self._send(200, result)
if self.path.startswith("/api/update/apply"):
start_background_task("apply")
state = load_update_state()
state["status"] = "in_progress"
state["message"] = "Starting background apply"
save_update_state(state)
return self._send(202, state)
if self.path.startswith("/api/update/auto"):
state = load_update_state()
state["auto_check"] = bool(payload.get("enable"))
save_update_state(state)
diag_log("info", "Release auto-check toggled", {"enabled": state["auto_check"]})
return self._send(200, state)
if self.path.startswith("/api/update/channel"):
chan = payload.get("channel", "dev")
if chan not in ("dev", "stable"):
return self._send(400, {"error": "channel must be dev or stable"})
state = load_update_state()
state["channel"] = chan
save_update_state(state)
diag_log("info", "Release channel set", {"channel": chan})
return self._send(200, state)
if self.path.startswith("/api/diag/log/level"):
state = _save_diag_state(payload.get("enabled"), payload.get("level"))
diag_log("info", "Diag level updated", state)
return self._send(200, {"state": state})
if self.path.startswith("/api/diag/log/clear"):
try:
DIAG_LOG_FILE.unlink(missing_ok=True)
except Exception:
pass
diag_log("info", "Diag log cleared")
return self._send(200, {"cleared": True, "state": _load_diag_state()})
if self.path.startswith("/api/services/add"):
name = payload.get("name")
port = int(payload.get("port", 0))
if not name or not port:
return self._send(400, {"error": "name and port required"})
if port in CORE_PORTS and name != CORE_NAME:
return self._send(400, {"error": f"Port {port} is reserved for {CORE_NAME}"})
services = load_services()
if any(s.get("port") == port for s in services):
return self._send(400, {"error": "port already exists"})
host = default_host()
scheme = payload.get("scheme")
if scheme not in ("http", "https"):
scheme = "https" if detect_https(host, port) else "http"
notice = (payload.get("notice") or "").strip()
notice_link = (payload.get("notice_link") or "").strip()
self_signed = bool(payload.get("self_signed"))
path = normalize_path(payload.get("path"))
svc = {"name": name, "port": port, "scheme": scheme, "url": f"{scheme}://{host}:{port}{path}"}
if notice:
svc["notice"] = notice
if notice_link:
svc["notice_link"] = notice_link
if self_signed:
svc["self_signed"] = True
if path:
svc["path"] = path
services.append(svc)
save_services(services)
try:
allow_port_lan(port)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
diag_log("info", "Service added", {"name": name, "port": port, "scheme": scheme})
return self._send(200, {"services": services, "message": f"Added {name} on port {port} and opened LAN firewall"})
if self.path.startswith("/api/services/remove"):
port = int(payload.get("port", 0))
if not port:
return self._send(400, {"error": "port required"})
if port in CORE_PORTS:
return self._send(400, {"error": f"Cannot remove core service on port {port}"})
services = [s for s in load_services() if s.get("port") != port]
try:
remove_port_lan(port)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
save_services(services)
diag_log("info", "Service removed", {"port": port})
return self._send(200, {"services": services, "message": f"Removed service on port {port}"})
if self.path.startswith("/api/services/update"):
port = int(payload.get("port", 0))
new_name = payload.get("name")
new_port = payload.get("new_port")
new_scheme = payload.get("scheme")
notice = payload.get("notice")
notice_link = payload.get("notice_link")
new_path = payload.get("path")
self_signed = payload.get("self_signed")
services = load_services()
updated = False
for svc in services:
if svc.get("port") == port:
if new_name:
if port in CORE_PORTS and new_name != CORE_NAME:
return self._send(400, {"error": f"Core service on port {port} must stay named {CORE_NAME}"})
svc["name"] = new_name
target_port = svc.get("port")
if new_port is not None:
new_port_int = int(new_port)
if new_port_int != port:
if new_port_int in CORE_PORTS and svc.get("name") != CORE_NAME:
return self._send(400, {"error": f"Port {new_port_int} is reserved for {CORE_NAME}"})
if any(s.get("port") == new_port_int and s is not svc for s in services):
return self._send(400, {"error": "new port already in use"})
try:
remove_port_lan(port)
allow_port_lan(new_port_int)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
svc["port"] = new_port_int
target_port = new_port_int
host = default_host()
if new_path is not None:
path = normalize_path(new_path)
if path:
svc["path"] = path
elif "path" in svc:
svc.pop("path", None)
else:
path = normalize_path(svc.get("path"))
if path:
svc["path"] = path
if new_scheme:
scheme = new_scheme if new_scheme in ("http", "https") else None
else:
scheme = svc.get("scheme")
if not scheme or scheme == "auto":
scheme = "https" if detect_https(host, target_port) else "http"
svc["scheme"] = scheme
svc["url"] = f"{scheme}://{host}:{target_port}{path}"
if notice is not None:
text = (notice or "").strip()
if text:
svc["notice"] = text
elif "notice" in svc:
svc.pop("notice", None)
if notice_link is not None:
link = (notice_link or "").strip()
if link:
svc["notice_link"] = link
elif "notice_link" in svc:
svc.pop("notice_link", None)
if self_signed is not None:
if bool(self_signed):
svc["self_signed"] = True
else:
svc.pop("self_signed", None)
updated = True
break
if not updated:
return self._send(404, {"error": "service not found"})
save_services(services)
diag_log("info", "Service updated", {"port": svc.get("port"), "name": new_name or None, "scheme": svc.get("scheme")})
return self._send(200, {"services": services, "message": "Service updated"})
return self._send(404, {"error": "not found"})