Files
pi-kit/pikit-api.py

979 lines
38 KiB
Python

#!/usr/bin/env python3
import json, os, subprocess, socket, shutil, pathlib, datetime, tarfile
from http.server import BaseHTTPRequestHandler, HTTPServer
import re
import urllib.request
import hashlib
HOST = "127.0.0.1"
PORT = 4000
SERVICE_JSON = pathlib.Path("/etc/pikit/services.json")
RESET_LOG = pathlib.Path("/var/log/pikit-reset.log")
API_LOG = pathlib.Path("/var/log/pikit-api.log")
DEBUG_FLAG = pathlib.Path("/boot/pikit-debug").exists()
HTTPS_PORTS = {443, 5252}
CORE_PORTS = {80}
CORE_NAME = "Pi-Kit Dashboard"
READY_FILE = pathlib.Path("/var/run/pikit-ready")
APT_AUTO_CFG = pathlib.Path("/etc/apt/apt.conf.d/20auto-upgrades")
APT_UA_BASE = pathlib.Path("/etc/apt/apt.conf.d/50unattended-upgrades")
APT_UA_OVERRIDE = pathlib.Path("/etc/apt/apt.conf.d/51pikit-unattended.conf")
DEFAULT_UPDATE_TIME = "04:00"
DEFAULT_UPGRADE_TIME = "04:30"
SECURITY_PATTERNS = [
'origin=Debian,codename=${distro_codename},label=Debian-Security',
'origin=Debian,codename=${distro_codename}-security,label=Debian-Security',
]
ALL_PATTERNS = [
'origin=Debian,codename=${distro_codename},label=Debian',
*SECURITY_PATTERNS,
]
# Release updater constants
VERSION_FILE = pathlib.Path("/etc/pikit/version")
WEB_VERSION_FILE = pathlib.Path("/var/www/pikit-web/data/version.json")
UPDATE_STATE_DIR = pathlib.Path("/var/lib/pikit-update")
UPDATE_STATE = UPDATE_STATE_DIR / "state.json"
UPDATE_LOCK = pathlib.Path("/var/run/pikit-update.lock")
DEFAULT_MANIFEST_URL = os.environ.get(
"PIKIT_MANIFEST_URL",
"https://git.44r0n.cc/44r0n7/pi-kit/releases/latest/download/manifest.json",
)
WEB_ROOT = pathlib.Path("/var/www/pikit-web")
API_PATH = pathlib.Path("/usr/local/bin/pikit-api.py")
BACKUP_ROOT = pathlib.Path("/var/backups/pikit")
TMP_ROOT = pathlib.Path("/var/tmp/pikit-update")
def ensure_dir(path: pathlib.Path):
path.mkdir(parents=True, exist_ok=True)
def sha256_file(path: pathlib.Path):
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
class FirewallToolMissing(Exception):
"""Raised when ufw is unavailable but a firewall change was requested."""
pass
def normalize_path(path: str | None) -> str:
"""Normalize optional service path. Empty -> "". Ensure leading slash."""
if not path:
return ""
p = str(path).strip()
if not p:
return ""
if not p.startswith("/"):
p = "/" + p
return p
def dbg(msg):
if not DEBUG_FLAG:
return
API_LOG.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.utcnow().isoformat()
with API_LOG.open("a") as f:
f.write(f"[{ts}] {msg}\n")
def set_ssh_password_auth(allow: bool):
"""
Enable/disable SSH password authentication without requiring the current password.
Used during factory reset to restore a predictable state.
"""
cfg = pathlib.Path("/etc/ssh/sshd_config")
text = cfg.read_text() if cfg.exists() else ""
def set_opt(key, value):
nonlocal text
pattern = f"{key} "
lines = text.splitlines()
replaced = False
for idx, line in enumerate(lines):
if line.strip().startswith(pattern):
lines[idx] = f"{key} {value}"
replaced = True
break
if not replaced:
lines.append(f"{key} {value}")
text_new = "\n".join(lines) + "\n"
return text_new
text = set_opt("PasswordAuthentication", "yes" if allow else "no")
text = set_opt("KbdInteractiveAuthentication", "no")
text = set_opt("ChallengeResponseAuthentication", "no")
text = set_opt("PubkeyAuthentication", "yes")
text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password")
cfg.write_text(text)
subprocess.run(["systemctl", "restart", "ssh"], check=False)
return True, f"SSH password auth {'enabled' if allow else 'disabled'}"
def load_services():
if SERVICE_JSON.exists():
try:
data = json.loads(SERVICE_JSON.read_text())
# Normalize entries: ensure url built from port if missing
host = socket.gethostname()
for svc in data:
svc_path = normalize_path(svc.get("path"))
if svc_path:
svc["path"] = svc_path
if svc.get("port"):
scheme = svc.get("scheme")
if not scheme:
scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http"
svc["scheme"] = scheme
svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}"
return data
except Exception:
dbg("Failed to read services.json")
return []
return []
def save_services(services):
SERVICE_JSON.parent.mkdir(parents=True, exist_ok=True)
SERVICE_JSON.write_text(json.dumps(services, indent=2))
def auto_updates_enabled():
if not APT_AUTO_CFG.exists():
return False
text = APT_AUTO_CFG.read_text()
return 'APT::Periodic::Unattended-Upgrade "1";' in text
def set_auto_updates(enable: bool):
"""
Toggle unattended upgrades in a way that matches systemd state, not just the
apt config file. Assumes unattended-upgrades is already installed.
"""
units_maskable = [
"apt-daily.service",
"apt-daily-upgrade.service",
"apt-daily.timer",
"apt-daily-upgrade.timer",
"unattended-upgrades.service",
]
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
service = "unattended-upgrades.service"
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
if enable:
APT_AUTO_CFG.write_text(
'APT::Periodic::Update-Package-Lists "1";\n'
'APT::Periodic::Unattended-Upgrade "1";\n'
)
for unit in units_maskable:
subprocess.run(["systemctl", "unmask", unit], check=False)
# Enable timers and the service; start them so state matches immediately
for unit in timers + [service]:
subprocess.run(["systemctl", "enable", unit], check=False)
for unit in timers:
subprocess.run(["systemctl", "start", unit], check=False)
subprocess.run(["systemctl", "start", service], check=False)
else:
APT_AUTO_CFG.write_text(
'APT::Periodic::Update-Package-Lists "0";\n'
'APT::Periodic::Unattended-Upgrade "0";\n'
)
# Stop/disable and mask to mirror DietPi defaults
for unit in timers + [service]:
subprocess.run(["systemctl", "stop", unit], check=False)
subprocess.run(["systemctl", "disable", unit], check=False)
for unit in units_maskable:
subprocess.run(["systemctl", "mask", unit], check=False)
def _systemctl_is(unit: str, verb: str) -> bool:
try:
out = subprocess.check_output(["systemctl", verb, unit], text=True).strip()
return out == "enabled" if verb == "is-enabled" else out == "active"
except Exception:
return False
def auto_updates_state():
config_on = auto_updates_enabled()
service = "unattended-upgrades.service"
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
state = {
"config_enabled": config_on,
"service_enabled": _systemctl_is(service, "is-enabled"),
"service_active": _systemctl_is(service, "is-active"),
"timers_enabled": {},
"timers_active": {},
}
for t in timers:
state["timers_enabled"][t] = _systemctl_is(t, "is-enabled")
state["timers_active"][t] = _systemctl_is(t, "is-active")
# Consider overall enabled only if config is on and both timers & service are enabled
state["enabled"] = (
config_on
and state["service_enabled"]
and all(state["timers_enabled"].values())
)
return state
def reboot_required():
return pathlib.Path("/run/reboot-required").exists()
def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False):
text = _strip_comments(text)
pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;'
m = re.search(pattern, text)
if not m:
return default
val = m.group(1).strip()
if as_bool:
return val.lower() in ("1", "true", "yes", "on")
if as_int:
try:
return int(val)
except ValueError:
return default
return val
def _parse_origins_patterns(text: str):
text = _strip_comments(text)
m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S)
patterns = []
if not m:
return patterns
body = m.group(1)
for line in body.splitlines():
ln = line.strip().strip('";')
if ln:
patterns.append(ln)
return patterns
def _read_timer_time(timer: str):
try:
out = subprocess.check_output(
["systemctl", "show", "--property=TimersCalendar", timer], text=True
)
# Example: TimersCalendar={ OnCalendar=*-*-* 03:10:00 ; next_elapse=... }
m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out)
if m:
return f"{int(m.group(1)):02d}:{m.group(2)}"
except Exception:
pass
return None
def _strip_comments(text: str):
"""Remove // and # line comments for safer parsing."""
lines = []
for ln in text.splitlines():
l = ln.strip()
if l.startswith("//") or l.startswith("#"):
continue
lines.append(ln)
return "\n".join(lines)
def _validate_time(val: str, default: str):
if not val:
return default
m = re.match(r"^(\d{1,2}):(\d{2})$", val.strip())
if not m:
return default
h, mi = int(m.group(1)), int(m.group(2))
if 0 <= h < 24 and 0 <= mi < 60:
return f"{h:02d}:{mi:02d}"
return default
def read_updates_config(state=None):
"""
Return a normalized unattended-upgrades configuration snapshot.
Values are sourced from the Pi-Kit override file when present, else the base file.
"""
text = ""
for path in (APT_UA_OVERRIDE, APT_UA_BASE):
if path.exists():
try:
text += path.read_text() + "\n"
except Exception:
pass
scope_hint = None
m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text)
if m_scope:
scope_hint = m_scope.group(1).lower()
cleaned = _strip_comments(text)
patterns = _parse_origins_patterns(cleaned)
scope = (
scope_hint
or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security")
)
cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True)
auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True)
reboot_time = _validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME)
reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True)
bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True)
update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME
upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME
state = state or auto_updates_state()
return {
"enabled": bool(state.get("enabled", False)),
"scope": scope,
"cleanup": bool(cleanup),
"bandwidth_limit_kbps": bandwidth,
"auto_reboot": bool(auto_reboot),
"reboot_time": reboot_time,
"reboot_with_users": bool(reboot_with_users),
"update_time": update_time,
"upgrade_time": upgrade_time,
"state": state,
}
def _write_timer_override(timer: str, time_str: str):
time_norm = _validate_time(time_str, DEFAULT_UPDATE_TIME)
override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d")
override_dir.mkdir(parents=True, exist_ok=True)
override_file = override_dir / "pikit.conf"
override_file.write_text(
"[Timer]\n"
f"OnCalendar=*-*-* {time_norm}\n"
"Persistent=true\n"
"RandomizedDelaySec=30min\n"
)
subprocess.run(["systemctl", "daemon-reload"], check=False)
subprocess.run(["systemctl", "restart", timer], check=False)
def set_updates_config(opts: dict):
"""
Apply unattended-upgrades configuration from dashboard inputs.
"""
enable = bool(opts.get("enable", True))
scope = opts.get("scope") or "all"
patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS
cleanup = bool(opts.get("cleanup", False))
bandwidth = opts.get("bandwidth_limit_kbps")
auto_reboot = bool(opts.get("auto_reboot", False))
reboot_time = _validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME)
reboot_with_users = bool(opts.get("reboot_with_users", False))
update_time = _validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME)
upgrade_time = _validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME)
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
set_auto_updates(enable)
lines = [
"// Managed by Pi-Kit dashboard",
f"// PIKIT_SCOPE: {scope}",
"Unattended-Upgrade::Origins-Pattern {",
]
for p in patterns:
lines.append(f' "{p}";')
lines.append("};")
lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};')
lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};')
lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";')
lines.append(
f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};'
)
if bandwidth is not None:
lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";')
APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True)
APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n")
# Timer overrides for when upgrades run
_write_timer_override("apt-daily.timer", update_time)
_write_timer_override("apt-daily-upgrade.timer", upgrade_time)
return read_updates_config()
def detect_https(host, port):
try:
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with socket.create_connection((host, int(port)), timeout=1.5) as sock:
with ctx.wrap_socket(sock, server_hostname=host):
return True
except Exception:
return False
def factory_reset():
# Restore services config
if pathlib.Path("/boot/custom-files/pikit-services.json").exists():
shutil.copy("/boot/custom-files/pikit-services.json", SERVICE_JSON)
else:
SERVICE_JSON.write_text(json.dumps([
{"name": "Pi-Kit Dashboard", "port": 80},
{"name": "DietPi Dashboard", "port": 5252},
], indent=2))
# Reset firewall
reset_firewall()
# Reset SSH auth to password and set defaults
set_ssh_password_auth(True)
for user in ("root", "dietpi"):
try:
subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True)
except Exception:
pass
# Ensure dietpi exists
if not pathlib.Path("/home/dietpi").exists():
subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False)
subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False)
# Log and reboot
pathlib.Path("/var/log/pikit-reset.log").write_text("Factory reset triggered\n")
subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True)
def port_online(host, port):
try:
with socket.create_connection((host, int(port)), timeout=1.5):
return True
except Exception:
return False
def ufw_status_allows(port: int) -> bool:
try:
out = subprocess.check_output(["ufw", "status"], text=True)
return f"{port}" in out and "ALLOW" in out
except Exception:
return False
def allow_port_lan(port: int):
"""Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error."""
if not shutil.which("ufw"):
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
def remove_port_lan(port: int):
"""Close a LAN rule for a port; raise if ufw is missing so callers can surface the error."""
if not shutil.which("ufw"):
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
def reset_firewall():
subprocess.run(["ufw", "--force", "reset"], check=False)
subprocess.run(["ufw", "default", "deny", "incoming"], check=False)
subprocess.run(["ufw", "default", "deny", "outgoing"], check=False)
# Outbound essentials + LAN
for port in ("53", "80", "443", "123", "67", "68"):
subprocess.run(["ufw", "allow", "out", port], check=False)
subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False)
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "out", "to", subnet], check=False)
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
for port in ("22", "80", "443", "5252", "5253"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False)
subprocess.run(["ufw", "--force", "enable"], check=False)
def read_current_version():
if VERSION_FILE.exists():
return VERSION_FILE.read_text().strip()
if WEB_VERSION_FILE.exists():
try:
return json.loads(WEB_VERSION_FILE.read_text()).get("version", "unknown")
except Exception:
return "unknown"
return "unknown"
def load_update_state():
UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True)
if UPDATE_STATE.exists():
try:
return json.loads(UPDATE_STATE.read_text())
except Exception:
pass
return {
"current_version": read_current_version(),
"latest_version": None,
"last_check": None,
"status": "unknown",
"message": "",
"auto_check": False,
"in_progress": False,
"progress": None,
}
def save_update_state(state: dict):
UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True)
UPDATE_STATE.write_text(json.dumps(state, indent=2))
def fetch_manifest(url: str = None):
target = url or DEFAULT_MANIFEST_URL
resp = urllib.request.urlopen(target, timeout=10)
data = resp.read()
manifest = json.loads(data.decode())
return manifest
def download_file(url: str, dest: pathlib.Path):
ensure_dir(dest.parent)
with urllib.request.urlopen(url, timeout=30) as resp, dest.open("wb") as f:
shutil.copyfileobj(resp, f)
return dest
def check_for_update():
state = load_update_state()
state["in_progress"] = True
state["progress"] = "Checking for updates…"
save_update_state(state)
try:
manifest = fetch_manifest()
latest = manifest.get("version") or manifest.get("latest_version")
state["latest_version"] = latest
state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z"
if latest and latest != state.get("current_version"):
state["status"] = "update_available"
state["message"] = manifest.get("changelog", "Update available")
else:
state["status"] = "up_to_date"
state["message"] = "Up to date"
except Exception as e:
state["status"] = "up_to_date"
state["message"] = f"Could not reach update server: {e}"
state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z"
finally:
state["in_progress"] = False
state["progress"] = None
save_update_state(state)
return state
def apply_update_stub():
"""Download + install release tarball with backup/rollback."""
state = load_update_state()
if state.get("in_progress"):
state["message"] = "Update already in progress"
save_update_state(state)
return state
manifest = None
state["in_progress"] = True
state["progress"] = "Starting update…"
save_update_state(state)
try:
manifest = fetch_manifest()
latest = manifest.get("version") or manifest.get("latest_version")
if not latest:
raise RuntimeError("Manifest missing version")
# Paths
bundle_url = manifest.get("bundle") or manifest.get("url")
if not bundle_url:
raise RuntimeError("Manifest missing bundle url")
stage_dir = TMP_ROOT / latest
bundle_path = stage_dir / "bundle.tar.gz"
ensure_dir(stage_dir)
state["progress"] = "Downloading release…"
save_update_state(state)
download_file(bundle_url, bundle_path)
# Verify hash if provided
expected_hash = None
for f in manifest.get("files", []):
if f.get("path") == "bundle.tar.gz" and f.get("sha256"):
expected_hash = f["sha256"]
break
if expected_hash:
got = sha256_file(bundle_path)
if got.lower() != expected_hash.lower():
raise RuntimeError("Bundle hash mismatch")
state["progress"] = "Staging files…"
save_update_state(state)
# Extract
with tarfile.open(bundle_path, "r:gz") as tar:
tar.extractall(stage_dir)
# Backup current
ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
backup_dir = BACKUP_ROOT / ts
ensure_dir(backup_dir)
# Backup web and api
if WEB_ROOT.exists():
shutil.copytree(WEB_ROOT, backup_dir / "pikit-web")
if API_PATH.exists():
shutil.copy2(API_PATH, backup_dir / "pikit-api.py")
# Deploy from staging
staged_web = stage_dir / "pikit-web"
if staged_web.exists():
shutil.rmtree(WEB_ROOT, ignore_errors=True)
shutil.copytree(staged_web, WEB_ROOT)
staged_api = stage_dir / "pikit-api.py"
if staged_api.exists():
shutil.copy2(staged_api, API_PATH)
os.chmod(API_PATH, 0o755)
# Restart services (best-effort)
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
subprocess.run(["systemctl", "restart", svc], check=False)
VERSION_FILE.parent.mkdir(parents=True, exist_ok=True)
VERSION_FILE.write_text(str(latest))
state["current_version"] = str(latest)
state["latest_version"] = str(latest)
state["status"] = "up_to_date"
state["message"] = "Update installed"
state["progress"] = None
save_update_state(state)
except Exception as e:
state["status"] = "error"
state["message"] = f"Update failed: {e}"
state["progress"] = None
save_update_state(state)
# Attempt rollback if backup exists
backups = sorted(BACKUP_ROOT.glob("*"), reverse=True)
if backups:
try:
latest_backup = backups[0]
if (latest_backup / "pikit-web").exists():
shutil.rmtree(WEB_ROOT, ignore_errors=True)
shutil.copytree(latest_backup / "pikit-web", WEB_ROOT)
if (latest_backup / "pikit-api.py").exists():
shutil.copy2(latest_backup / "pikit-api.py", API_PATH)
os.chmod(API_PATH, 0o755)
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
subprocess.run(["systemctl", "restart", svc], check=False)
state["message"] += " (rolled back to previous backup)"
save_update_state(state)
except Exception as re:
state["message"] += f" (rollback failed: {re})"
save_update_state(state)
finally:
state["in_progress"] = False
state["progress"] = None
save_update_state(state)
return state
def rollback_update_stub():
state = load_update_state()
backups = sorted(BACKUP_ROOT.glob("*"), reverse=True)
if not backups:
state["status"] = "error"
state["message"] = "No backup available to rollback."
save_update_state(state)
return state
target = backups[0]
try:
if (target / "pikit-web").exists():
shutil.rmtree(WEB_ROOT, ignore_errors=True)
shutil.copytree(target / "pikit-web", WEB_ROOT)
if (target / "pikit-api.py").exists():
shutil.copy2(target / "pikit-api.py", API_PATH)
os.chmod(API_PATH, 0o755)
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
subprocess.run(["systemctl", "restart", svc], check=False)
state["status"] = "up_to_date"
state["message"] = f"Rolled back to backup {target.name}"
except Exception as e:
state["status"] = "error"
state["message"] = f"Rollback failed: {e}"
save_update_state(state)
return state
class Handler(BaseHTTPRequestHandler):
"""Minimal JSON API for the dashboard (status, services, updates, reset)."""
def _send(self, code, data):
body = json.dumps(data).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
return
def do_GET(self):
if self.path.startswith("/api/status"):
uptime = float(open("/proc/uptime").read().split()[0])
load1, load5, load15 = os.getloadavg()
meminfo = {}
for ln in open("/proc/meminfo"):
k, v = ln.split(":", 1)
meminfo[k] = int(v.strip().split()[0])
total = meminfo.get("MemTotal", 0)//1024
free = meminfo.get("MemAvailable", 0)//1024
disk = shutil.disk_usage("/")
# CPU temperature (best-effort)
cpu_temp = None
for path in ("/sys/class/thermal/thermal_zone0/temp",):
if pathlib.Path(path).exists():
try:
cpu_temp = float(pathlib.Path(path).read_text().strip())/1000.0
break
except Exception:
pass
# LAN IP (first non-loopback)
ip_addr = None
try:
out = subprocess.check_output(["hostname", "-I"], text=True).strip()
ip_addr = out.split()[0] if out else None
except Exception:
pass
# OS version
os_ver = "DietPi"
try:
for line in pathlib.Path("/etc/os-release").read_text().splitlines():
if line.startswith("PRETTY_NAME="):
os_ver = line.split("=",1)[1].strip().strip('"')
break
except Exception:
pass
updates_state = auto_updates_state()
updates_config = read_updates_config(updates_state)
services = []
for svc in load_services():
svc = dict(svc)
port = svc.get("port")
if port:
svc["online"] = port_online("127.0.0.1", port)
svc["firewall_open"] = ufw_status_allows(port)
services.append(svc)
data = {
"hostname": socket.gethostname(),
"uptime_seconds": uptime,
"load": [load1, load5, load15],
"memory_mb": {"total": total, "free": free},
"disk_mb": {"total": disk.total//1024//1024, "free": disk.free//1024//1024},
"cpu_temp_c": cpu_temp,
"lan_ip": ip_addr,
"os_version": os_ver,
"auto_updates_enabled": updates_state.get("enabled", False),
"auto_updates": updates_state,
"updates_config": updates_config,
"reboot_required": reboot_required(),
"ready": READY_FILE.exists(),
"services": services
}
self._send(200, data)
elif self.path.startswith("/api/services"):
services = []
for svc in load_services():
svc = dict(svc)
port = svc.get("port")
if port:
svc["online"] = port_online("127.0.0.1", port)
svc["firewall_open"] = ufw_status_allows(port)
services.append(svc)
self._send(200, {"services": services})
elif self.path.startswith("/api/updates/auto"):
state = auto_updates_state()
self._send(200, {"enabled": state.get("enabled", False), "details": state})
elif self.path.startswith("/api/updates/config"):
cfg = read_updates_config()
self._send(200, cfg)
elif self.path.startswith("/api/update/status"):
state = load_update_state()
state["current_version"] = read_current_version()
self._send(200, state)
else:
self._send(404, {"error": "not found"})
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
payload = json.loads(self.rfile.read(length) or "{}")
if self.path.startswith("/api/reset"):
if payload.get("confirm") == "YES":
self._send(200, {"message": "Resetting and rebooting..."})
dbg("Factory reset triggered via API")
factory_reset()
else:
self._send(400, {"error": "type YES to confirm"})
return
if self.path.startswith("/api/updates/auto"):
enable = bool(payload.get("enable"))
set_auto_updates(enable)
dbg(f"Auto updates set to {enable}")
state = auto_updates_state()
return self._send(200, {"enabled": state.get("enabled", False), "details": state})
if self.path.startswith("/api/updates/config"):
try:
cfg = set_updates_config(payload or {})
dbg(f"Update settings applied: {cfg}")
return self._send(200, cfg)
except Exception as e:
dbg(f"Failed to apply updates config: {e}")
return self._send(500, {"error": str(e)})
if self.path.startswith("/api/update/check"):
state = check_for_update()
return self._send(200, state)
if self.path.startswith("/api/update/apply"):
state = apply_update_stub()
return self._send(200, state)
if self.path.startswith("/api/update/rollback"):
state = rollback_update_stub()
return self._send(200, state)
if self.path.startswith("/api/update/auto"):
state = load_update_state()
state["auto_check"] = bool(payload.get("enable"))
save_update_state(state)
return self._send(200, state)
if self.path.startswith("/api/services/add"):
name = payload.get("name")
port = int(payload.get("port", 0))
if not name or not port:
return self._send(400, {"error": "name and port required"})
if port in CORE_PORTS and name != CORE_NAME:
return self._send(400, {"error": f"Port {port} is reserved for {CORE_NAME}"})
services = load_services()
if any(s.get("port") == port for s in services):
return self._send(400, {"error": "port already exists"})
host = socket.gethostname()
scheme = payload.get("scheme")
if scheme not in ("http", "https"):
scheme = "https" if detect_https(host, port) else "http"
notice = (payload.get("notice") or "").strip()
notice_link = (payload.get("notice_link") or "").strip()
self_signed = bool(payload.get("self_signed"))
path = normalize_path(payload.get("path"))
svc = {"name": name, "port": port, "scheme": scheme, "url": f"{scheme}://{host}:{port}{path}"}
if notice:
svc["notice"] = notice
if notice_link:
svc["notice_link"] = notice_link
if self_signed:
svc["self_signed"] = True
if path:
svc["path"] = path
services.append(svc)
save_services(services)
try:
allow_port_lan(port)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
return self._send(200, {"services": services, "message": f"Added {name} on port {port} and opened LAN firewall"})
if self.path.startswith("/api/services/remove"):
port = int(payload.get("port", 0))
if not port:
return self._send(400, {"error": "port required"})
if port in CORE_PORTS:
return self._send(400, {"error": f"Cannot remove core service on port {port}"})
services = [s for s in load_services() if s.get("port") != port]
try:
remove_port_lan(port)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
save_services(services)
return self._send(200, {"services": services, "message": f"Removed service on port {port}"})
if self.path.startswith("/api/services/update"):
port = int(payload.get("port", 0))
new_name = payload.get("name")
new_port = payload.get("new_port")
new_scheme = payload.get("scheme")
notice = payload.get("notice")
notice_link = payload.get("notice_link")
new_path = payload.get("path")
self_signed = payload.get("self_signed")
services = load_services()
updated = False
for svc in services:
if svc.get("port") == port:
if new_name:
# Prevent renaming core service to something else if still on core port
if port in CORE_PORTS and new_name != CORE_NAME:
return self._send(400, {"error": f"Core service on port {port} must stay named {CORE_NAME}"})
svc["name"] = new_name
target_port = svc.get("port")
port_changed = False
if new_port is not None:
new_port_int = int(new_port)
if new_port_int != port:
if new_port_int in CORE_PORTS and svc.get("name") != CORE_NAME:
return self._send(400, {"error": f"Port {new_port_int} is reserved for {CORE_NAME}"})
if any(s.get("port") == new_port_int and s is not svc for s in services):
return self._send(400, {"error": "new port already in use"})
try:
remove_port_lan(port)
allow_port_lan(new_port_int)
except FirewallToolMissing as e:
return self._send(500, {"error": str(e)})
svc["port"] = new_port_int
target_port = new_port_int
port_changed = True
host = socket.gethostname()
if new_path is not None:
path = normalize_path(new_path)
if path:
svc["path"] = path
elif "path" in svc:
svc.pop("path", None)
else:
path = normalize_path(svc.get("path"))
if path:
svc["path"] = path
if new_scheme:
scheme = new_scheme if new_scheme in ("http", "https") else None
else:
scheme = svc.get("scheme")
if not scheme or scheme == "auto":
scheme = "https" if detect_https(host, target_port) else "http"
svc["scheme"] = scheme
svc["url"] = f"{scheme}://{host}:{target_port}{path}"
if notice is not None:
text = (notice or "").strip()
if text:
svc["notice"] = text
elif "notice" in svc:
svc.pop("notice", None)
if notice_link is not None:
link = (notice_link or "").strip()
if link:
svc["notice_link"] = link
elif "notice_link" in svc:
svc.pop("notice_link", None)
if self_signed is not None:
if bool(self_signed):
svc["self_signed"] = True
else:
svc.pop("self_signed", None)
updated = True
break
if not updated:
return self._send(404, {"error": "service not found"})
save_services(services)
return self._send(200, {"services": services, "message": "Service updated"})
self._send(404, {"error": "not found"})
def main():
load_services()
server = HTTPServer((HOST, PORT), Handler)
server.serve_forever()
if __name__ == "__main__":
main()