#!/usr/bin/env python3 import json, os, subprocess, socket, shutil, pathlib, datetime, tarfile from http.server import BaseHTTPRequestHandler, HTTPServer import re import urllib.request import hashlib import fcntl HOST = "127.0.0.1" PORT = 4000 SERVICE_JSON = pathlib.Path("/etc/pikit/services.json") RESET_LOG = pathlib.Path("/var/log/pikit-reset.log") API_LOG = pathlib.Path("/var/log/pikit-api.log") DEBUG_FLAG = pathlib.Path("/boot/pikit-debug").exists() HTTPS_PORTS = {443, 5252} CORE_PORTS = {80} CORE_NAME = "Pi-Kit Dashboard" READY_FILE = pathlib.Path("/var/run/pikit-ready") APT_AUTO_CFG = pathlib.Path("/etc/apt/apt.conf.d/20auto-upgrades") APT_UA_BASE = pathlib.Path("/etc/apt/apt.conf.d/50unattended-upgrades") APT_UA_OVERRIDE = pathlib.Path("/etc/apt/apt.conf.d/51pikit-unattended.conf") DEFAULT_UPDATE_TIME = "04:00" DEFAULT_UPGRADE_TIME = "04:30" SECURITY_PATTERNS = [ 'origin=Debian,codename=${distro_codename},label=Debian-Security', 'origin=Debian,codename=${distro_codename}-security,label=Debian-Security', ] ALL_PATTERNS = [ 'origin=Debian,codename=${distro_codename},label=Debian', *SECURITY_PATTERNS, ] # Release updater constants VERSION_FILE = pathlib.Path("/etc/pikit/version") WEB_VERSION_FILE = pathlib.Path("/var/www/pikit-web/data/version.json") UPDATE_STATE_DIR = pathlib.Path("/var/lib/pikit-update") UPDATE_STATE = UPDATE_STATE_DIR / "state.json" UPDATE_LOCK = pathlib.Path("/var/run/pikit-update.lock") DEFAULT_MANIFEST_URL = os.environ.get( "PIKIT_MANIFEST_URL", "https://git.44r0n.cc/44r0n7/pi-kit/releases/latest/download/manifest.json", ) AUTH_TOKEN = os.environ.get("PIKIT_AUTH_TOKEN") WEB_ROOT = pathlib.Path("/var/www/pikit-web") API_PATH = pathlib.Path("/usr/local/bin/pikit-api.py") BACKUP_ROOT = pathlib.Path("/var/backups/pikit") TMP_ROOT = pathlib.Path("/var/tmp/pikit-update") def ensure_dir(path: pathlib.Path): path.mkdir(parents=True, exist_ok=True) def sha256_file(path: pathlib.Path): h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() class FirewallToolMissing(Exception): """Raised when ufw is unavailable but a firewall change was requested.""" pass def normalize_path(path: str | None) -> str: """Normalize optional service path. Empty -> "". Ensure leading slash.""" if not path: return "" p = str(path).strip() if not p: return "" if not p.startswith("/"): p = "/" + p return p def dbg(msg): if not DEBUG_FLAG: return API_LOG.parent.mkdir(parents=True, exist_ok=True) ts = datetime.datetime.utcnow().isoformat() with API_LOG.open("a") as f: f.write(f"[{ts}] {msg}\n") def set_ssh_password_auth(allow: bool): """ Enable/disable SSH password authentication without requiring the current password. Used during factory reset to restore a predictable state. """ cfg = pathlib.Path("/etc/ssh/sshd_config") text = cfg.read_text() if cfg.exists() else "" def set_opt(key, value): nonlocal text pattern = f"{key} " lines = text.splitlines() replaced = False for idx, line in enumerate(lines): if line.strip().startswith(pattern): lines[idx] = f"{key} {value}" replaced = True break if not replaced: lines.append(f"{key} {value}") text_new = "\n".join(lines) + "\n" return text_new text = set_opt("PasswordAuthentication", "yes" if allow else "no") text = set_opt("KbdInteractiveAuthentication", "no") text = set_opt("ChallengeResponseAuthentication", "no") text = set_opt("PubkeyAuthentication", "yes") text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password") cfg.write_text(text) subprocess.run(["systemctl", "restart", "ssh"], check=False) return True, f"SSH password auth {'enabled' if allow else 'disabled'}" def load_services(): if SERVICE_JSON.exists(): try: data = json.loads(SERVICE_JSON.read_text()) # Normalize entries: ensure url built from port if missing host = socket.gethostname() for svc in data: svc_path = normalize_path(svc.get("path")) if svc_path: svc["path"] = svc_path if svc.get("port"): scheme = svc.get("scheme") if not scheme: scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http" svc["scheme"] = scheme svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}" return data except Exception: dbg("Failed to read services.json") return [] return [] def save_services(services): SERVICE_JSON.parent.mkdir(parents=True, exist_ok=True) SERVICE_JSON.write_text(json.dumps(services, indent=2)) def auto_updates_enabled(): if not APT_AUTO_CFG.exists(): return False text = APT_AUTO_CFG.read_text() return 'APT::Periodic::Unattended-Upgrade "1";' in text def set_auto_updates(enable: bool): """ Toggle unattended upgrades in a way that matches systemd state, not just the apt config file. Assumes unattended-upgrades is already installed. """ units_maskable = [ "apt-daily.service", "apt-daily-upgrade.service", "apt-daily.timer", "apt-daily-upgrade.timer", "unattended-upgrades.service", ] timers = ["apt-daily.timer", "apt-daily-upgrade.timer"] service = "unattended-upgrades.service" APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True) if enable: APT_AUTO_CFG.write_text( 'APT::Periodic::Update-Package-Lists "1";\n' 'APT::Periodic::Unattended-Upgrade "1";\n' ) for unit in units_maskable: subprocess.run(["systemctl", "unmask", unit], check=False) # Enable timers and the service; start them so state matches immediately for unit in timers + [service]: subprocess.run(["systemctl", "enable", unit], check=False) for unit in timers: subprocess.run(["systemctl", "start", unit], check=False) subprocess.run(["systemctl", "start", service], check=False) else: APT_AUTO_CFG.write_text( 'APT::Periodic::Update-Package-Lists "0";\n' 'APT::Periodic::Unattended-Upgrade "0";\n' ) # Stop/disable and mask to mirror DietPi defaults for unit in timers + [service]: subprocess.run(["systemctl", "stop", unit], check=False) subprocess.run(["systemctl", "disable", unit], check=False) for unit in units_maskable: subprocess.run(["systemctl", "mask", unit], check=False) def _systemctl_is(unit: str, verb: str) -> bool: try: out = subprocess.check_output(["systemctl", verb, unit], text=True).strip() return out == "enabled" if verb == "is-enabled" else out == "active" except Exception: return False def auto_updates_state(): config_on = auto_updates_enabled() service = "unattended-upgrades.service" timers = ["apt-daily.timer", "apt-daily-upgrade.timer"] state = { "config_enabled": config_on, "service_enabled": _systemctl_is(service, "is-enabled"), "service_active": _systemctl_is(service, "is-active"), "timers_enabled": {}, "timers_active": {}, } for t in timers: state["timers_enabled"][t] = _systemctl_is(t, "is-enabled") state["timers_active"][t] = _systemctl_is(t, "is-active") # Consider overall enabled only if config is on and both timers & service are enabled state["enabled"] = ( config_on and state["service_enabled"] and all(state["timers_enabled"].values()) ) return state def reboot_required(): return pathlib.Path("/run/reboot-required").exists() def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False): text = _strip_comments(text) pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;' m = re.search(pattern, text) if not m: return default val = m.group(1).strip() if as_bool: return val.lower() in ("1", "true", "yes", "on") if as_int: try: return int(val) except ValueError: return default return val def _parse_origins_patterns(text: str): text = _strip_comments(text) m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S) patterns = [] if not m: return patterns body = m.group(1) for line in body.splitlines(): ln = line.strip().strip('";') if ln: patterns.append(ln) return patterns def _read_timer_time(timer: str): try: out = subprocess.check_output( ["systemctl", "show", "--property=TimersCalendar", timer], text=True ) # Example: TimersCalendar={ OnCalendar=*-*-* 03:10:00 ; next_elapse=... } m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out) if m: return f"{int(m.group(1)):02d}:{m.group(2)}" except Exception: pass return None def _strip_comments(text: str): """Remove // and # line comments for safer parsing.""" lines = [] for ln in text.splitlines(): l = ln.strip() if l.startswith("//") or l.startswith("#"): continue lines.append(ln) return "\n".join(lines) def _validate_time(val: str, default: str): if not val: return default m = re.match(r"^(\d{1,2}):(\d{2})$", val.strip()) if not m: return default h, mi = int(m.group(1)), int(m.group(2)) if 0 <= h < 24 and 0 <= mi < 60: return f"{h:02d}:{mi:02d}" return default def read_updates_config(state=None): """ Return a normalized unattended-upgrades configuration snapshot. Values are sourced from the Pi-Kit override file when present, else the base file. """ text = "" for path in (APT_UA_OVERRIDE, APT_UA_BASE): if path.exists(): try: text += path.read_text() + "\n" except Exception: pass scope_hint = None m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text) if m_scope: scope_hint = m_scope.group(1).lower() cleaned = _strip_comments(text) patterns = _parse_origins_patterns(cleaned) scope = ( scope_hint or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security") ) cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True) auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True) reboot_time = _validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME) reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True) bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True) update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME state = state or auto_updates_state() return { "enabled": bool(state.get("enabled", False)), "scope": scope, "cleanup": bool(cleanup), "bandwidth_limit_kbps": bandwidth, "auto_reboot": bool(auto_reboot), "reboot_time": reboot_time, "reboot_with_users": bool(reboot_with_users), "update_time": update_time, "upgrade_time": upgrade_time, "state": state, } def _write_timer_override(timer: str, time_str: str): time_norm = _validate_time(time_str, DEFAULT_UPDATE_TIME) override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d") override_dir.mkdir(parents=True, exist_ok=True) override_file = override_dir / "pikit.conf" override_file.write_text( "[Timer]\n" f"OnCalendar=*-*-* {time_norm}\n" "Persistent=true\n" "RandomizedDelaySec=30min\n" ) subprocess.run(["systemctl", "daemon-reload"], check=False) subprocess.run(["systemctl", "restart", timer], check=False) def set_updates_config(opts: dict): """ Apply unattended-upgrades configuration from dashboard inputs. """ enable = bool(opts.get("enable", True)) scope = opts.get("scope") or "all" patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS cleanup = bool(opts.get("cleanup", False)) bandwidth = opts.get("bandwidth_limit_kbps") auto_reboot = bool(opts.get("auto_reboot", False)) reboot_time = _validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME) reboot_with_users = bool(opts.get("reboot_with_users", False)) update_time = _validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME) upgrade_time = _validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME) APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True) set_auto_updates(enable) lines = [ "// Managed by Pi-Kit dashboard", f"// PIKIT_SCOPE: {scope}", "Unattended-Upgrade::Origins-Pattern {", ] for p in patterns: lines.append(f' "{p}";') lines.append("};") lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};') lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};') lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";') lines.append( f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};' ) if bandwidth is not None: lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";') APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True) APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n") # Timer overrides for when upgrades run _write_timer_override("apt-daily.timer", update_time) _write_timer_override("apt-daily-upgrade.timer", upgrade_time) return read_updates_config() def detect_https(host, port): try: import ssl ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE with socket.create_connection((host, int(port)), timeout=1.5) as sock: with ctx.wrap_socket(sock, server_hostname=host): return True except Exception: return False def factory_reset(): # Restore services config if pathlib.Path("/boot/custom-files/pikit-services.json").exists(): shutil.copy("/boot/custom-files/pikit-services.json", SERVICE_JSON) else: SERVICE_JSON.write_text(json.dumps([ {"name": "Pi-Kit Dashboard", "port": 80}, {"name": "DietPi Dashboard", "port": 5252}, ], indent=2)) # Reset firewall reset_firewall() # Reset SSH auth to password and set defaults set_ssh_password_auth(True) for user in ("root", "dietpi"): try: subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True) except Exception: pass # Ensure dietpi exists if not pathlib.Path("/home/dietpi").exists(): subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False) subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False) # Log and reboot pathlib.Path("/var/log/pikit-reset.log").write_text("Factory reset triggered\n") subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True) def port_online(host, port): try: with socket.create_connection((host, int(port)), timeout=1.5): return True except Exception: return False def ufw_status_allows(port: int) -> bool: try: out = subprocess.check_output(["ufw", "status"], text=True) return f"{port}" in out and "ALLOW" in out except Exception: return False def allow_port_lan(port: int): """Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error.""" if not shutil.which("ufw"): raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.") for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False) def remove_port_lan(port: int): """Close a LAN rule for a port; raise if ufw is missing so callers can surface the error.""" if not shutil.which("ufw"): raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.") for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False) def reset_firewall(): subprocess.run(["ufw", "--force", "reset"], check=False) subprocess.run(["ufw", "default", "deny", "incoming"], check=False) subprocess.run(["ufw", "default", "deny", "outgoing"], check=False) # Outbound essentials + LAN for port in ("53", "80", "443", "123", "67", "68"): subprocess.run(["ufw", "allow", "out", port], check=False) subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False) for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "allow", "out", "to", subnet], check=False) for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): for port in ("22", "80", "443", "5252", "5253"): subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False) subprocess.run(["ufw", "--force", "enable"], check=False) def read_current_version(): if VERSION_FILE.exists(): return VERSION_FILE.read_text().strip() if WEB_VERSION_FILE.exists(): try: return json.loads(WEB_VERSION_FILE.read_text()).get("version", "unknown") except Exception: return "unknown" return "unknown" def load_update_state(): UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True) if UPDATE_STATE.exists(): try: return json.loads(UPDATE_STATE.read_text()) except Exception: pass return { "current_version": read_current_version(), "latest_version": None, "last_check": None, "status": "unknown", "message": "", "auto_check": False, "in_progress": False, "progress": None, } def save_update_state(state: dict): UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True) UPDATE_STATE.write_text(json.dumps(state, indent=2)) def fetch_manifest(url: str = None): target = url or DEFAULT_MANIFEST_URL req = urllib.request.Request(target) if AUTH_TOKEN: req.add_header("Authorization", f"token {AUTH_TOKEN}") resp = urllib.request.urlopen(req, timeout=10) data = resp.read() manifest = json.loads(data.decode()) return manifest def download_file(url: str, dest: pathlib.Path): ensure_dir(dest.parent) req = urllib.request.Request(url) if AUTH_TOKEN: req.add_header("Authorization", f"token {AUTH_TOKEN}") with urllib.request.urlopen(req, timeout=60) as resp, dest.open("wb") as f: shutil.copyfileobj(resp, f) return dest def check_for_update(): state = load_update_state() lock = acquire_lock() if lock is None: state["status"] = "error" state["message"] = "Another update is running" save_update_state(state) return state state["in_progress"] = True state["progress"] = "Checking for updates…" save_update_state(state) try: manifest = fetch_manifest() latest = manifest.get("version") or manifest.get("latest_version") state["latest_version"] = latest state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z" if latest and latest != state.get("current_version"): state["status"] = "update_available" state["message"] = manifest.get("changelog", "Update available") else: state["status"] = "up_to_date" state["message"] = "Up to date" except Exception as e: state["status"] = "up_to_date" state["message"] = f"Could not reach update server: {e}" state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z" finally: state["in_progress"] = False state["progress"] = None save_update_state(state) if lock: release_lock(lock) return state def apply_update_stub(): """Download + install release tarball with backup/rollback.""" state = load_update_state() if state.get("in_progress"): state["message"] = "Update already in progress" save_update_state(state) return state lock = acquire_lock() if lock is None: state["status"] = "error" state["message"] = "Another update is running" save_update_state(state) return state manifest = None state["in_progress"] = True state["progress"] = "Starting update…" save_update_state(state) try: manifest = fetch_manifest() latest = manifest.get("version") or manifest.get("latest_version") if not latest: raise RuntimeError("Manifest missing version") # Paths bundle_url = manifest.get("bundle") or manifest.get("url") if not bundle_url: raise RuntimeError("Manifest missing bundle url") stage_dir = TMP_ROOT / latest bundle_path = stage_dir / "bundle.tar.gz" ensure_dir(stage_dir) state["progress"] = "Downloading release…" save_update_state(state) download_file(bundle_url, bundle_path) # Verify hash if provided expected_hash = None for f in manifest.get("files", []): if f.get("path") == "bundle.tar.gz" and f.get("sha256"): expected_hash = f["sha256"] break if expected_hash: got = sha256_file(bundle_path) if got.lower() != expected_hash.lower(): raise RuntimeError("Bundle hash mismatch") state["progress"] = "Staging files…" save_update_state(state) # Extract with tarfile.open(bundle_path, "r:gz") as tar: tar.extractall(stage_dir) # Backup current ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") backup_dir = BACKUP_ROOT / ts ensure_dir(backup_dir) # Backup web and api if WEB_ROOT.exists(): shutil.copytree(WEB_ROOT, backup_dir / "pikit-web") if API_PATH.exists(): shutil.copy2(API_PATH, backup_dir / "pikit-api.py") prune_backups(keep=2) # Deploy from staging staged_web = stage_dir / "pikit-web" if staged_web.exists(): shutil.rmtree(WEB_ROOT, ignore_errors=True) shutil.copytree(staged_web, WEB_ROOT) staged_api = stage_dir / "pikit-api.py" if staged_api.exists(): shutil.copy2(staged_api, API_PATH) os.chmod(API_PATH, 0o755) # Restart services (best-effort) for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"): subprocess.run(["systemctl", "restart", svc], check=False) VERSION_FILE.parent.mkdir(parents=True, exist_ok=True) VERSION_FILE.write_text(str(latest)) state["current_version"] = str(latest) state["latest_version"] = str(latest) state["status"] = "up_to_date" state["message"] = "Update installed" state["progress"] = None save_update_state(state) except urllib.error.HTTPError as e: state["status"] = "up_to_date" state["message"] = f"No release available ({e.code})" except Exception as e: state["status"] = "error" state["message"] = f"Update failed: {e}" state["progress"] = None save_update_state(state) # Attempt rollback if backup exists backups = sorted(BACKUP_ROOT.glob("*"), reverse=True) if backups: try: latest_backup = backups[0] if (latest_backup / "pikit-web").exists(): shutil.rmtree(WEB_ROOT, ignore_errors=True) shutil.copytree(latest_backup / "pikit-web", WEB_ROOT) if (latest_backup / "pikit-api.py").exists(): shutil.copy2(latest_backup / "pikit-api.py", API_PATH) os.chmod(API_PATH, 0o755) for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"): subprocess.run(["systemctl", "restart", svc], check=False) state["message"] += " (rolled back to previous backup)" save_update_state(state) except Exception as re: state["message"] += f" (rollback failed: {re})" save_update_state(state) finally: state["in_progress"] = False state["progress"] = None save_update_state(state) if lock: release_lock(lock) return state def rollback_update_stub(): state = load_update_state() lock = acquire_lock() if lock is None: state["status"] = "error" state["message"] = "Another update is running" save_update_state(state) return state backups = sorted(BACKUP_ROOT.glob("*"), reverse=True) if not backups: state["status"] = "error" state["message"] = "No backup available to rollback." save_update_state(state) return state target = backups[0] try: if (target / "pikit-web").exists(): shutil.rmtree(WEB_ROOT, ignore_errors=True) shutil.copytree(target / "pikit-web", WEB_ROOT) if (target / "pikit-api.py").exists(): shutil.copy2(target / "pikit-api.py", API_PATH) os.chmod(API_PATH, 0o755) for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"): subprocess.run(["systemctl", "restart", svc], check=False) state["status"] = "up_to_date" state["message"] = f"Rolled back to backup {target.name}" except Exception as e: state["status"] = "error" state["message"] = f"Rollback failed: {e}" save_update_state(state) if lock: release_lock(lock) return state def acquire_lock(): try: ensure_dir(UPDATE_LOCK.parent) lockfile = UPDATE_LOCK.open("w") fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) lockfile.write(str(os.getpid())) lockfile.flush() return lockfile except Exception: return None def release_lock(lockfile): try: fcntl.flock(lockfile.fileno(), fcntl.LOCK_UN) lockfile.close() UPDATE_LOCK.unlink(missing_ok=True) except Exception: pass def prune_backups(keep: int = 2): if keep < 1: keep = 1 ensure_dir(BACKUP_ROOT) backups = sorted([p for p in BACKUP_ROOT.iterdir() if p.is_dir()], reverse=True) for old in backups[keep:]: shutil.rmtree(old, ignore_errors=True) class Handler(BaseHTTPRequestHandler): """Minimal JSON API for the dashboard (status, services, updates, reset).""" def _send(self, code, data): body = json.dumps(data).encode() self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, fmt, *args): return def do_GET(self): if self.path.startswith("/api/status"): uptime = float(open("/proc/uptime").read().split()[0]) load1, load5, load15 = os.getloadavg() meminfo = {} for ln in open("/proc/meminfo"): k, v = ln.split(":", 1) meminfo[k] = int(v.strip().split()[0]) total = meminfo.get("MemTotal", 0)//1024 free = meminfo.get("MemAvailable", 0)//1024 disk = shutil.disk_usage("/") # CPU temperature (best-effort) cpu_temp = None for path in ("/sys/class/thermal/thermal_zone0/temp",): if pathlib.Path(path).exists(): try: cpu_temp = float(pathlib.Path(path).read_text().strip())/1000.0 break except Exception: pass # LAN IP (first non-loopback) ip_addr = None try: out = subprocess.check_output(["hostname", "-I"], text=True).strip() ip_addr = out.split()[0] if out else None except Exception: pass # OS version os_ver = "DietPi" try: for line in pathlib.Path("/etc/os-release").read_text().splitlines(): if line.startswith("PRETTY_NAME="): os_ver = line.split("=",1)[1].strip().strip('"') break except Exception: pass updates_state = auto_updates_state() updates_config = read_updates_config(updates_state) services = [] for svc in load_services(): svc = dict(svc) port = svc.get("port") if port: svc["online"] = port_online("127.0.0.1", port) svc["firewall_open"] = ufw_status_allows(port) services.append(svc) data = { "hostname": socket.gethostname(), "uptime_seconds": uptime, "load": [load1, load5, load15], "memory_mb": {"total": total, "free": free}, "disk_mb": {"total": disk.total//1024//1024, "free": disk.free//1024//1024}, "cpu_temp_c": cpu_temp, "lan_ip": ip_addr, "os_version": os_ver, "auto_updates_enabled": updates_state.get("enabled", False), "auto_updates": updates_state, "updates_config": updates_config, "reboot_required": reboot_required(), "ready": READY_FILE.exists(), "services": services } self._send(200, data) elif self.path.startswith("/api/services"): services = [] for svc in load_services(): svc = dict(svc) port = svc.get("port") if port: svc["online"] = port_online("127.0.0.1", port) svc["firewall_open"] = ufw_status_allows(port) services.append(svc) self._send(200, {"services": services}) elif self.path.startswith("/api/updates/auto"): state = auto_updates_state() self._send(200, {"enabled": state.get("enabled", False), "details": state}) elif self.path.startswith("/api/updates/config"): cfg = read_updates_config() self._send(200, cfg) elif self.path.startswith("/api/update/status"): state = load_update_state() state["current_version"] = read_current_version() self._send(200, state) else: self._send(404, {"error": "not found"}) def do_POST(self): length = int(self.headers.get("Content-Length", 0)) payload = json.loads(self.rfile.read(length) or "{}") if self.path.startswith("/api/reset"): if payload.get("confirm") == "YES": self._send(200, {"message": "Resetting and rebooting..."}) dbg("Factory reset triggered via API") factory_reset() else: self._send(400, {"error": "type YES to confirm"}) return if self.path.startswith("/api/updates/auto"): enable = bool(payload.get("enable")) set_auto_updates(enable) dbg(f"Auto updates set to {enable}") state = auto_updates_state() return self._send(200, {"enabled": state.get("enabled", False), "details": state}) if self.path.startswith("/api/updates/config"): try: cfg = set_updates_config(payload or {}) dbg(f"Update settings applied: {cfg}") return self._send(200, cfg) except Exception as e: dbg(f"Failed to apply updates config: {e}") return self._send(500, {"error": str(e)}) if self.path.startswith("/api/update/check"): state = check_for_update() return self._send(200, state) if self.path.startswith("/api/update/apply"): state = apply_update_stub() return self._send(200, state) if self.path.startswith("/api/update/rollback"): state = rollback_update_stub() return self._send(200, state) if self.path.startswith("/api/update/auto"): state = load_update_state() state["auto_check"] = bool(payload.get("enable")) save_update_state(state) return self._send(200, state) if self.path.startswith("/api/services/add"): name = payload.get("name") port = int(payload.get("port", 0)) if not name or not port: return self._send(400, {"error": "name and port required"}) if port in CORE_PORTS and name != CORE_NAME: return self._send(400, {"error": f"Port {port} is reserved for {CORE_NAME}"}) services = load_services() if any(s.get("port") == port for s in services): return self._send(400, {"error": "port already exists"}) host = socket.gethostname() scheme = payload.get("scheme") if scheme not in ("http", "https"): scheme = "https" if detect_https(host, port) else "http" notice = (payload.get("notice") or "").strip() notice_link = (payload.get("notice_link") or "").strip() self_signed = bool(payload.get("self_signed")) path = normalize_path(payload.get("path")) svc = {"name": name, "port": port, "scheme": scheme, "url": f"{scheme}://{host}:{port}{path}"} if notice: svc["notice"] = notice if notice_link: svc["notice_link"] = notice_link if self_signed: svc["self_signed"] = True if path: svc["path"] = path services.append(svc) save_services(services) try: allow_port_lan(port) except FirewallToolMissing as e: return self._send(500, {"error": str(e)}) return self._send(200, {"services": services, "message": f"Added {name} on port {port} and opened LAN firewall"}) if self.path.startswith("/api/services/remove"): port = int(payload.get("port", 0)) if not port: return self._send(400, {"error": "port required"}) if port in CORE_PORTS: return self._send(400, {"error": f"Cannot remove core service on port {port}"}) services = [s for s in load_services() if s.get("port") != port] try: remove_port_lan(port) except FirewallToolMissing as e: return self._send(500, {"error": str(e)}) save_services(services) return self._send(200, {"services": services, "message": f"Removed service on port {port}"}) if self.path.startswith("/api/services/update"): port = int(payload.get("port", 0)) new_name = payload.get("name") new_port = payload.get("new_port") new_scheme = payload.get("scheme") notice = payload.get("notice") notice_link = payload.get("notice_link") new_path = payload.get("path") self_signed = payload.get("self_signed") services = load_services() updated = False for svc in services: if svc.get("port") == port: if new_name: # Prevent renaming core service to something else if still on core port if port in CORE_PORTS and new_name != CORE_NAME: return self._send(400, {"error": f"Core service on port {port} must stay named {CORE_NAME}"}) svc["name"] = new_name target_port = svc.get("port") port_changed = False if new_port is not None: new_port_int = int(new_port) if new_port_int != port: if new_port_int in CORE_PORTS and svc.get("name") != CORE_NAME: return self._send(400, {"error": f"Port {new_port_int} is reserved for {CORE_NAME}"}) if any(s.get("port") == new_port_int and s is not svc for s in services): return self._send(400, {"error": "new port already in use"}) try: remove_port_lan(port) allow_port_lan(new_port_int) except FirewallToolMissing as e: return self._send(500, {"error": str(e)}) svc["port"] = new_port_int target_port = new_port_int port_changed = True host = socket.gethostname() if new_path is not None: path = normalize_path(new_path) if path: svc["path"] = path elif "path" in svc: svc.pop("path", None) else: path = normalize_path(svc.get("path")) if path: svc["path"] = path if new_scheme: scheme = new_scheme if new_scheme in ("http", "https") else None else: scheme = svc.get("scheme") if not scheme or scheme == "auto": scheme = "https" if detect_https(host, target_port) else "http" svc["scheme"] = scheme svc["url"] = f"{scheme}://{host}:{target_port}{path}" if notice is not None: text = (notice or "").strip() if text: svc["notice"] = text elif "notice" in svc: svc.pop("notice", None) if notice_link is not None: link = (notice_link or "").strip() if link: svc["notice_link"] = link elif "notice_link" in svc: svc.pop("notice_link", None) if self_signed is not None: if bool(self_signed): svc["self_signed"] = True else: svc.pop("self_signed", None) updated = True break if not updated: return self._send(404, {"error": "service not found"}) save_services(services) return self._send(200, {"services": services, "message": "Service updated"}) self._send(404, {"error": "not found"}) def main(): load_services() server = HTTPServer((HOST, PORT), Handler) server.serve_forever() if __name__ == "__main__": main()