1102 lines
42 KiB
Python
1102 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
import json, os, subprocess, socket, shutil, pathlib, datetime, tarfile, sys, argparse
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import re
|
|
import urllib.request
|
|
import hashlib
|
|
import fcntl
|
|
from functools import partial
|
|
|
|
HOST = "127.0.0.1"
|
|
PORT = 4000
|
|
SERVICE_JSON = pathlib.Path("/etc/pikit/services.json")
|
|
RESET_LOG = pathlib.Path("/var/log/pikit-reset.log")
|
|
API_LOG = pathlib.Path("/var/log/pikit-api.log")
|
|
DEBUG_FLAG = pathlib.Path("/boot/pikit-debug").exists()
|
|
HTTPS_PORTS = {443, 5252}
|
|
CORE_PORTS = {80}
|
|
CORE_NAME = "Pi-Kit Dashboard"
|
|
READY_FILE = pathlib.Path("/var/run/pikit-ready")
|
|
APT_AUTO_CFG = pathlib.Path("/etc/apt/apt.conf.d/20auto-upgrades")
|
|
APT_UA_BASE = pathlib.Path("/etc/apt/apt.conf.d/50unattended-upgrades")
|
|
APT_UA_OVERRIDE = pathlib.Path("/etc/apt/apt.conf.d/51pikit-unattended.conf")
|
|
DEFAULT_UPDATE_TIME = "04:00"
|
|
DEFAULT_UPGRADE_TIME = "04:30"
|
|
SECURITY_PATTERNS = [
|
|
'origin=Debian,codename=${distro_codename},label=Debian-Security',
|
|
'origin=Debian,codename=${distro_codename}-security,label=Debian-Security',
|
|
]
|
|
ALL_PATTERNS = [
|
|
'origin=Debian,codename=${distro_codename},label=Debian',
|
|
*SECURITY_PATTERNS,
|
|
]
|
|
|
|
# Release updater constants
|
|
VERSION_FILE = pathlib.Path("/etc/pikit/version")
|
|
WEB_VERSION_FILE = pathlib.Path("/var/www/pikit-web/data/version.json")
|
|
UPDATE_STATE_DIR = pathlib.Path("/var/lib/pikit-update")
|
|
UPDATE_STATE = UPDATE_STATE_DIR / "state.json"
|
|
UPDATE_LOCK = pathlib.Path("/var/run/pikit-update.lock")
|
|
DEFAULT_MANIFEST_URL = os.environ.get(
|
|
"PIKIT_MANIFEST_URL",
|
|
"https://git.44r0n.cc/44r0n7/pi-kit/releases/latest/download/manifest.json",
|
|
)
|
|
AUTH_TOKEN = os.environ.get("PIKIT_AUTH_TOKEN")
|
|
WEB_ROOT = pathlib.Path("/var/www/pikit-web")
|
|
API_PATH = pathlib.Path("/usr/local/bin/pikit-api.py")
|
|
BACKUP_ROOT = pathlib.Path("/var/backups/pikit")
|
|
TMP_ROOT = pathlib.Path("/var/tmp/pikit-update")
|
|
|
|
|
|
def ensure_dir(path: pathlib.Path):
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def sha256_file(path: pathlib.Path):
|
|
h = hashlib.sha256()
|
|
with path.open("rb") as f:
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
class FirewallToolMissing(Exception):
|
|
"""Raised when ufw is unavailable but a firewall change was requested."""
|
|
pass
|
|
|
|
|
|
def normalize_path(path: str | None) -> str:
|
|
"""Normalize optional service path. Empty -> "". Ensure leading slash."""
|
|
if not path:
|
|
return ""
|
|
p = str(path).strip()
|
|
if not p:
|
|
return ""
|
|
if not p.startswith("/"):
|
|
p = "/" + p
|
|
return p
|
|
|
|
|
|
def dbg(msg):
|
|
if not DEBUG_FLAG:
|
|
return
|
|
API_LOG.parent.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.datetime.utcnow().isoformat()
|
|
with API_LOG.open("a") as f:
|
|
f.write(f"[{ts}] {msg}\n")
|
|
|
|
|
|
def set_ssh_password_auth(allow: bool):
|
|
"""
|
|
Enable/disable SSH password authentication without requiring the current password.
|
|
Used during factory reset to restore a predictable state.
|
|
"""
|
|
cfg = pathlib.Path("/etc/ssh/sshd_config")
|
|
text = cfg.read_text() if cfg.exists() else ""
|
|
|
|
def set_opt(key, value):
|
|
nonlocal text
|
|
pattern = f"{key} "
|
|
lines = text.splitlines()
|
|
replaced = False
|
|
for idx, line in enumerate(lines):
|
|
if line.strip().startswith(pattern):
|
|
lines[idx] = f"{key} {value}"
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
lines.append(f"{key} {value}")
|
|
text_new = "\n".join(lines) + "\n"
|
|
return text_new
|
|
|
|
text = set_opt("PasswordAuthentication", "yes" if allow else "no")
|
|
text = set_opt("KbdInteractiveAuthentication", "no")
|
|
text = set_opt("ChallengeResponseAuthentication", "no")
|
|
text = set_opt("PubkeyAuthentication", "yes")
|
|
text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password")
|
|
cfg.write_text(text)
|
|
subprocess.run(["systemctl", "restart", "ssh"], check=False)
|
|
return True, f"SSH password auth {'enabled' if allow else 'disabled'}"
|
|
|
|
|
|
def load_services():
|
|
if SERVICE_JSON.exists():
|
|
try:
|
|
data = json.loads(SERVICE_JSON.read_text())
|
|
# Normalize entries: ensure url built from port if missing
|
|
host = socket.gethostname()
|
|
for svc in data:
|
|
svc_path = normalize_path(svc.get("path"))
|
|
if svc_path:
|
|
svc["path"] = svc_path
|
|
if svc.get("port"):
|
|
scheme = svc.get("scheme")
|
|
if not scheme:
|
|
scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http"
|
|
svc["scheme"] = scheme
|
|
svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}"
|
|
return data
|
|
except Exception:
|
|
dbg("Failed to read services.json")
|
|
return []
|
|
return []
|
|
|
|
|
|
def save_services(services):
|
|
SERVICE_JSON.parent.mkdir(parents=True, exist_ok=True)
|
|
SERVICE_JSON.write_text(json.dumps(services, indent=2))
|
|
|
|
|
|
def auto_updates_enabled():
|
|
if not APT_AUTO_CFG.exists():
|
|
return False
|
|
text = APT_AUTO_CFG.read_text()
|
|
return 'APT::Periodic::Unattended-Upgrade "1";' in text
|
|
|
|
|
|
def set_auto_updates(enable: bool):
|
|
"""
|
|
Toggle unattended upgrades in a way that matches systemd state, not just the
|
|
apt config file. Assumes unattended-upgrades is already installed.
|
|
"""
|
|
units_maskable = [
|
|
"apt-daily.service",
|
|
"apt-daily-upgrade.service",
|
|
"apt-daily.timer",
|
|
"apt-daily-upgrade.timer",
|
|
"unattended-upgrades.service",
|
|
]
|
|
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
|
|
service = "unattended-upgrades.service"
|
|
|
|
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
|
|
if enable:
|
|
APT_AUTO_CFG.write_text(
|
|
'APT::Periodic::Update-Package-Lists "1";\n'
|
|
'APT::Periodic::Unattended-Upgrade "1";\n'
|
|
)
|
|
for unit in units_maskable:
|
|
subprocess.run(["systemctl", "unmask", unit], check=False)
|
|
# Enable timers and the service; start them so state matches immediately
|
|
for unit in timers + [service]:
|
|
subprocess.run(["systemctl", "enable", unit], check=False)
|
|
for unit in timers:
|
|
subprocess.run(["systemctl", "start", unit], check=False)
|
|
subprocess.run(["systemctl", "start", service], check=False)
|
|
else:
|
|
APT_AUTO_CFG.write_text(
|
|
'APT::Periodic::Update-Package-Lists "0";\n'
|
|
'APT::Periodic::Unattended-Upgrade "0";\n'
|
|
)
|
|
# Stop/disable and mask to mirror DietPi defaults
|
|
for unit in timers + [service]:
|
|
subprocess.run(["systemctl", "stop", unit], check=False)
|
|
subprocess.run(["systemctl", "disable", unit], check=False)
|
|
for unit in units_maskable:
|
|
subprocess.run(["systemctl", "mask", unit], check=False)
|
|
|
|
|
|
def _systemctl_is(unit: str, verb: str) -> bool:
|
|
try:
|
|
out = subprocess.check_output(["systemctl", verb, unit], text=True).strip()
|
|
return out == "enabled" if verb == "is-enabled" else out == "active"
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def auto_updates_state():
|
|
config_on = auto_updates_enabled()
|
|
service = "unattended-upgrades.service"
|
|
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
|
|
state = {
|
|
"config_enabled": config_on,
|
|
"service_enabled": _systemctl_is(service, "is-enabled"),
|
|
"service_active": _systemctl_is(service, "is-active"),
|
|
"timers_enabled": {},
|
|
"timers_active": {},
|
|
}
|
|
for t in timers:
|
|
state["timers_enabled"][t] = _systemctl_is(t, "is-enabled")
|
|
state["timers_active"][t] = _systemctl_is(t, "is-active")
|
|
# Consider overall enabled only if config is on and both timers & service are enabled
|
|
state["enabled"] = (
|
|
config_on
|
|
and state["service_enabled"]
|
|
and all(state["timers_enabled"].values())
|
|
)
|
|
return state
|
|
|
|
|
|
def reboot_required():
|
|
return pathlib.Path("/run/reboot-required").exists()
|
|
|
|
|
|
def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False):
|
|
text = _strip_comments(text)
|
|
pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;'
|
|
m = re.search(pattern, text)
|
|
if not m:
|
|
return default
|
|
val = m.group(1).strip()
|
|
if as_bool:
|
|
return val.lower() in ("1", "true", "yes", "on")
|
|
if as_int:
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return default
|
|
return val
|
|
|
|
|
|
def _parse_origins_patterns(text: str):
|
|
text = _strip_comments(text)
|
|
m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S)
|
|
patterns = []
|
|
if not m:
|
|
return patterns
|
|
body = m.group(1)
|
|
for line in body.splitlines():
|
|
ln = line.strip().strip('";')
|
|
if ln:
|
|
patterns.append(ln)
|
|
return patterns
|
|
|
|
|
|
def _read_timer_time(timer: str):
|
|
try:
|
|
out = subprocess.check_output(
|
|
["systemctl", "show", "--property=TimersCalendar", timer], text=True
|
|
)
|
|
# Example: TimersCalendar={ OnCalendar=*-*-* 03:10:00 ; next_elapse=... }
|
|
m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out)
|
|
if m:
|
|
return f"{int(m.group(1)):02d}:{m.group(2)}"
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _strip_comments(text: str):
|
|
"""Remove // and # line comments for safer parsing."""
|
|
lines = []
|
|
for ln in text.splitlines():
|
|
l = ln.strip()
|
|
if l.startswith("//") or l.startswith("#"):
|
|
continue
|
|
lines.append(ln)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _validate_time(val: str, default: str):
|
|
if not val:
|
|
return default
|
|
m = re.match(r"^(\d{1,2}):(\d{2})$", val.strip())
|
|
if not m:
|
|
return default
|
|
h, mi = int(m.group(1)), int(m.group(2))
|
|
if 0 <= h < 24 and 0 <= mi < 60:
|
|
return f"{h:02d}:{mi:02d}"
|
|
return default
|
|
|
|
|
|
def read_updates_config(state=None):
|
|
"""
|
|
Return a normalized unattended-upgrades configuration snapshot.
|
|
Values are sourced from the Pi-Kit override file when present, else the base file.
|
|
"""
|
|
text = ""
|
|
for path in (APT_UA_OVERRIDE, APT_UA_BASE):
|
|
if path.exists():
|
|
try:
|
|
text += path.read_text() + "\n"
|
|
except Exception:
|
|
pass
|
|
scope_hint = None
|
|
m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text)
|
|
if m_scope:
|
|
scope_hint = m_scope.group(1).lower()
|
|
cleaned = _strip_comments(text)
|
|
patterns = _parse_origins_patterns(cleaned)
|
|
scope = (
|
|
scope_hint
|
|
or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security")
|
|
)
|
|
cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True)
|
|
auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True)
|
|
reboot_time = _validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME)
|
|
reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True)
|
|
bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True)
|
|
|
|
update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME
|
|
upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME
|
|
|
|
state = state or auto_updates_state()
|
|
return {
|
|
"enabled": bool(state.get("enabled", False)),
|
|
"scope": scope,
|
|
"cleanup": bool(cleanup),
|
|
"bandwidth_limit_kbps": bandwidth,
|
|
"auto_reboot": bool(auto_reboot),
|
|
"reboot_time": reboot_time,
|
|
"reboot_with_users": bool(reboot_with_users),
|
|
"update_time": update_time,
|
|
"upgrade_time": upgrade_time,
|
|
"state": state,
|
|
}
|
|
|
|
|
|
def _write_timer_override(timer: str, time_str: str):
|
|
time_norm = _validate_time(time_str, DEFAULT_UPDATE_TIME)
|
|
override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d")
|
|
override_dir.mkdir(parents=True, exist_ok=True)
|
|
override_file = override_dir / "pikit.conf"
|
|
override_file.write_text(
|
|
"[Timer]\n"
|
|
f"OnCalendar=*-*-* {time_norm}\n"
|
|
"Persistent=true\n"
|
|
"RandomizedDelaySec=30min\n"
|
|
)
|
|
subprocess.run(["systemctl", "daemon-reload"], check=False)
|
|
subprocess.run(["systemctl", "restart", timer], check=False)
|
|
|
|
|
|
def set_updates_config(opts: dict):
|
|
"""
|
|
Apply unattended-upgrades configuration from dashboard inputs.
|
|
"""
|
|
enable = bool(opts.get("enable", True))
|
|
scope = opts.get("scope") or "all"
|
|
patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS
|
|
cleanup = bool(opts.get("cleanup", False))
|
|
bandwidth = opts.get("bandwidth_limit_kbps")
|
|
auto_reboot = bool(opts.get("auto_reboot", False))
|
|
reboot_time = _validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME)
|
|
reboot_with_users = bool(opts.get("reboot_with_users", False))
|
|
update_time = _validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME)
|
|
upgrade_time = _validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME)
|
|
|
|
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
|
|
set_auto_updates(enable)
|
|
|
|
lines = [
|
|
"// Managed by Pi-Kit dashboard",
|
|
f"// PIKIT_SCOPE: {scope}",
|
|
"Unattended-Upgrade::Origins-Pattern {",
|
|
]
|
|
for p in patterns:
|
|
lines.append(f' "{p}";')
|
|
lines.append("};")
|
|
lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};')
|
|
lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};')
|
|
lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";')
|
|
lines.append(
|
|
f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};'
|
|
)
|
|
if bandwidth is not None:
|
|
lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";')
|
|
APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True)
|
|
APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n")
|
|
|
|
# Timer overrides for when upgrades run
|
|
_write_timer_override("apt-daily.timer", update_time)
|
|
_write_timer_override("apt-daily-upgrade.timer", upgrade_time)
|
|
return read_updates_config()
|
|
|
|
|
|
def detect_https(host, port):
|
|
try:
|
|
import ssl
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
with socket.create_connection((host, int(port)), timeout=1.5) as sock:
|
|
with ctx.wrap_socket(sock, server_hostname=host):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def factory_reset():
|
|
# Restore services config
|
|
if pathlib.Path("/boot/custom-files/pikit-services.json").exists():
|
|
shutil.copy("/boot/custom-files/pikit-services.json", SERVICE_JSON)
|
|
else:
|
|
SERVICE_JSON.write_text(json.dumps([
|
|
{"name": "Pi-Kit Dashboard", "port": 80},
|
|
{"name": "DietPi Dashboard", "port": 5252},
|
|
], indent=2))
|
|
# Reset firewall
|
|
reset_firewall()
|
|
# Reset SSH auth to password and set defaults
|
|
set_ssh_password_auth(True)
|
|
for user in ("root", "dietpi"):
|
|
try:
|
|
subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True)
|
|
except Exception:
|
|
pass
|
|
# Ensure dietpi exists
|
|
if not pathlib.Path("/home/dietpi").exists():
|
|
subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False)
|
|
subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False)
|
|
# Log and reboot
|
|
pathlib.Path("/var/log/pikit-reset.log").write_text("Factory reset triggered\n")
|
|
subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True)
|
|
|
|
|
|
def port_online(host, port):
|
|
try:
|
|
with socket.create_connection((host, int(port)), timeout=1.5):
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def ufw_status_allows(port: int) -> bool:
|
|
try:
|
|
out = subprocess.check_output(["ufw", "status"], text=True)
|
|
return f"{port}" in out and "ALLOW" in out
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def allow_port_lan(port: int):
|
|
"""Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error."""
|
|
if not shutil.which("ufw"):
|
|
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
|
|
|
|
|
|
def remove_port_lan(port: int):
|
|
"""Close a LAN rule for a port; raise if ufw is missing so callers can surface the error."""
|
|
if not shutil.which("ufw"):
|
|
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
|
|
|
|
|
|
def reset_firewall():
|
|
subprocess.run(["ufw", "--force", "reset"], check=False)
|
|
subprocess.run(["ufw", "default", "deny", "incoming"], check=False)
|
|
subprocess.run(["ufw", "default", "deny", "outgoing"], check=False)
|
|
# Outbound essentials + LAN
|
|
for port in ("53", "80", "443", "123", "67", "68"):
|
|
subprocess.run(["ufw", "allow", "out", port], check=False)
|
|
subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False)
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "allow", "out", "to", subnet], check=False)
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
for port in ("22", "80", "443", "5252", "5253"):
|
|
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False)
|
|
subprocess.run(["ufw", "--force", "enable"], check=False)
|
|
|
|
|
|
def read_current_version():
|
|
if VERSION_FILE.exists():
|
|
return VERSION_FILE.read_text().strip()
|
|
if WEB_VERSION_FILE.exists():
|
|
try:
|
|
return json.loads(WEB_VERSION_FILE.read_text()).get("version", "unknown")
|
|
except Exception:
|
|
return "unknown"
|
|
return "unknown"
|
|
|
|
|
|
def load_update_state():
|
|
UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
if UPDATE_STATE.exists():
|
|
try:
|
|
return json.loads(UPDATE_STATE.read_text())
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"current_version": read_current_version(),
|
|
"latest_version": None,
|
|
"last_check": None,
|
|
"status": "unknown",
|
|
"message": "",
|
|
"auto_check": False,
|
|
"in_progress": False,
|
|
"progress": None,
|
|
}
|
|
|
|
|
|
def save_update_state(state: dict):
|
|
UPDATE_STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
UPDATE_STATE.write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
def fetch_manifest(url: str = None):
|
|
target = url or DEFAULT_MANIFEST_URL
|
|
req = urllib.request.Request(target)
|
|
if AUTH_TOKEN:
|
|
req.add_header("Authorization", f"token {AUTH_TOKEN}")
|
|
resp = urllib.request.urlopen(req, timeout=10)
|
|
data = resp.read()
|
|
manifest = json.loads(data.decode())
|
|
return manifest
|
|
|
|
|
|
def download_file(url: str, dest: pathlib.Path):
|
|
ensure_dir(dest.parent)
|
|
req = urllib.request.Request(url)
|
|
if AUTH_TOKEN:
|
|
req.add_header("Authorization", f"token {AUTH_TOKEN}")
|
|
with urllib.request.urlopen(req, timeout=60) as resp, dest.open("wb") as f:
|
|
shutil.copyfileobj(resp, f)
|
|
return dest
|
|
|
|
|
|
def check_for_update():
|
|
state = load_update_state()
|
|
lock = acquire_lock()
|
|
if lock is None:
|
|
state["status"] = "error"
|
|
state["message"] = "Another update is running"
|
|
save_update_state(state)
|
|
return state
|
|
state["in_progress"] = True
|
|
state["progress"] = "Checking for updates…"
|
|
save_update_state(state)
|
|
try:
|
|
manifest = fetch_manifest()
|
|
latest = manifest.get("version") or manifest.get("latest_version")
|
|
state["latest_version"] = latest
|
|
state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z"
|
|
if latest and latest != state.get("current_version"):
|
|
state["status"] = "update_available"
|
|
state["message"] = manifest.get("changelog", "Update available")
|
|
else:
|
|
state["status"] = "up_to_date"
|
|
state["message"] = "Up to date"
|
|
except Exception as e:
|
|
state["status"] = "up_to_date"
|
|
state["message"] = f"Could not reach update server: {e}"
|
|
state["last_check"] = datetime.datetime.utcnow().isoformat() + "Z"
|
|
finally:
|
|
state["in_progress"] = False
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
if lock:
|
|
release_lock(lock)
|
|
return state
|
|
|
|
|
|
def apply_update_stub():
|
|
"""Download + install release tarball with backup/rollback."""
|
|
state = load_update_state()
|
|
if state.get("in_progress"):
|
|
state["message"] = "Update already in progress"
|
|
save_update_state(state)
|
|
return state
|
|
|
|
lock = acquire_lock()
|
|
if lock is None:
|
|
state["status"] = "error"
|
|
state["message"] = "Another update is running"
|
|
save_update_state(state)
|
|
return state
|
|
|
|
manifest = None
|
|
state["in_progress"] = True
|
|
state["status"] = "in_progress"
|
|
state["progress"] = "Starting update…"
|
|
save_update_state(state)
|
|
|
|
try:
|
|
manifest = fetch_manifest()
|
|
latest = manifest.get("version") or manifest.get("latest_version")
|
|
if not latest:
|
|
raise RuntimeError("Manifest missing version")
|
|
|
|
# Paths
|
|
bundle_url = manifest.get("bundle") or manifest.get("url")
|
|
if not bundle_url:
|
|
raise RuntimeError("Manifest missing bundle url")
|
|
stage_dir = TMP_ROOT / latest
|
|
bundle_path = stage_dir / "bundle.tar.gz"
|
|
ensure_dir(stage_dir)
|
|
|
|
state["progress"] = "Downloading release…"
|
|
save_update_state(state)
|
|
download_file(bundle_url, bundle_path)
|
|
|
|
# Verify hash if provided
|
|
expected_hash = None
|
|
for f in manifest.get("files", []):
|
|
if f.get("path") == "bundle.tar.gz" and f.get("sha256"):
|
|
expected_hash = f["sha256"]
|
|
break
|
|
if expected_hash:
|
|
got = sha256_file(bundle_path)
|
|
if got.lower() != expected_hash.lower():
|
|
raise RuntimeError("Bundle hash mismatch")
|
|
|
|
state["progress"] = "Staging files…"
|
|
save_update_state(state)
|
|
# Extract
|
|
with tarfile.open(bundle_path, "r:gz") as tar:
|
|
tar.extractall(stage_dir)
|
|
|
|
# Backup current
|
|
ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
|
|
backup_dir = BACKUP_ROOT / ts
|
|
ensure_dir(backup_dir)
|
|
# Backup web and api
|
|
if WEB_ROOT.exists():
|
|
ensure_dir(backup_dir / "pikit-web")
|
|
shutil.copytree(WEB_ROOT, backup_dir / "pikit-web", dirs_exist_ok=True)
|
|
if API_PATH.exists():
|
|
shutil.copy2(API_PATH, backup_dir / "pikit-api.py")
|
|
|
|
prune_backups(keep=2)
|
|
|
|
# Deploy from staging
|
|
staged_web = stage_dir / "pikit-web"
|
|
if staged_web.exists():
|
|
shutil.rmtree(WEB_ROOT, ignore_errors=True)
|
|
shutil.copytree(staged_web, WEB_ROOT)
|
|
staged_api = stage_dir / "pikit-api.py"
|
|
if staged_api.exists():
|
|
shutil.copy2(staged_api, API_PATH)
|
|
os.chmod(API_PATH, 0o755)
|
|
|
|
# Restart services (best-effort)
|
|
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
|
|
subprocess.run(["systemctl", "restart", svc], check=False)
|
|
|
|
VERSION_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
VERSION_FILE.write_text(str(latest))
|
|
|
|
state["current_version"] = str(latest)
|
|
state["latest_version"] = str(latest)
|
|
state["status"] = "up_to_date"
|
|
state["message"] = "Update installed"
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
except urllib.error.HTTPError as e:
|
|
state["status"] = "error"
|
|
state["message"] = f"No release available ({e.code})"
|
|
except Exception as e:
|
|
state["status"] = "error"
|
|
state["message"] = f"Update failed: {e}"
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
# Attempt rollback if backup exists
|
|
backups = sorted(BACKUP_ROOT.glob("*"), reverse=True)
|
|
if backups:
|
|
try:
|
|
latest_backup = backups[0]
|
|
if (latest_backup / "pikit-web").exists():
|
|
shutil.rmtree(WEB_ROOT, ignore_errors=True)
|
|
shutil.copytree(latest_backup / "pikit-web", WEB_ROOT)
|
|
if (latest_backup / "pikit-api.py").exists():
|
|
shutil.copy2(latest_backup / "pikit-api.py", API_PATH)
|
|
os.chmod(API_PATH, 0o755)
|
|
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
|
|
subprocess.run(["systemctl", "restart", svc], check=False)
|
|
state["message"] += " (rolled back to previous backup)"
|
|
save_update_state(state)
|
|
except Exception as re:
|
|
state["message"] += f" (rollback failed: {re})"
|
|
save_update_state(state)
|
|
finally:
|
|
state["in_progress"] = False
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
if lock:
|
|
release_lock(lock)
|
|
return state
|
|
|
|
|
|
def rollback_update_stub():
|
|
state = load_update_state()
|
|
lock = acquire_lock()
|
|
if lock is None:
|
|
state["status"] = "error"
|
|
state["message"] = "Another update is running"
|
|
save_update_state(state)
|
|
return state
|
|
state["in_progress"] = True
|
|
state["status"] = "in_progress"
|
|
state["progress"] = "Rolling back…"
|
|
save_update_state(state)
|
|
backups = sorted(BACKUP_ROOT.glob("*"), reverse=True)
|
|
if not backups:
|
|
state["status"] = "error"
|
|
state["message"] = "No backup available to rollback."
|
|
state["in_progress"] = False
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
release_lock(lock)
|
|
return state
|
|
target = backups[0]
|
|
try:
|
|
if (target / "pikit-web").exists():
|
|
shutil.rmtree(WEB_ROOT, ignore_errors=True)
|
|
shutil.copytree(target / "pikit-web", WEB_ROOT, dirs_exist_ok=True)
|
|
if (target / "pikit-api.py").exists():
|
|
shutil.copy2(target / "pikit-api.py", API_PATH)
|
|
os.chmod(API_PATH, 0o755)
|
|
for svc in ("pikit-api.service", "dietpi-dashboard-frontend.service"):
|
|
subprocess.run(["systemctl", "restart", svc], check=False)
|
|
state["status"] = "up_to_date"
|
|
state["message"] = f"Rolled back to backup {target.name}"
|
|
except Exception as e:
|
|
state["status"] = "error"
|
|
state["message"] = f"Rollback failed: {e}"
|
|
state["in_progress"] = False
|
|
state["progress"] = None
|
|
save_update_state(state)
|
|
release_lock(lock)
|
|
return state
|
|
|
|
|
|
def start_background_task(mode: str):
|
|
"""
|
|
Kick off a background update/rollback via systemd-run so nginx/API restarts
|
|
do not break the caller connection.
|
|
mode: "apply" or "rollback"
|
|
"""
|
|
assert mode in ("apply", "rollback"), "invalid mode"
|
|
unit = f"pikit-update-{mode}"
|
|
flag = f"--{mode}-update"
|
|
cmd = ["systemd-run", "--unit", unit, "--quiet"]
|
|
# Pass manifest URL/token if set in environment
|
|
if DEFAULT_MANIFEST_URL:
|
|
cmd += [f"--setenv=PIKIT_MANIFEST_URL={DEFAULT_MANIFEST_URL}"]
|
|
if AUTH_TOKEN:
|
|
cmd += [f"--setenv=PIKIT_AUTH_TOKEN={AUTH_TOKEN}"]
|
|
cmd += ["/usr/bin/env", "python3", str(API_PATH), flag]
|
|
subprocess.run(cmd, check=False)
|
|
|
|
|
|
def acquire_lock():
|
|
try:
|
|
ensure_dir(UPDATE_LOCK.parent)
|
|
lockfile = UPDATE_LOCK.open("w")
|
|
fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
lockfile.write(str(os.getpid()))
|
|
lockfile.flush()
|
|
return lockfile
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def release_lock(lockfile):
|
|
try:
|
|
fcntl.flock(lockfile.fileno(), fcntl.LOCK_UN)
|
|
lockfile.close()
|
|
UPDATE_LOCK.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def prune_backups(keep: int = 2):
|
|
if keep < 1:
|
|
keep = 1
|
|
ensure_dir(BACKUP_ROOT)
|
|
backups = sorted([p for p in BACKUP_ROOT.iterdir() if p.is_dir()], reverse=True)
|
|
for old in backups[keep:]:
|
|
shutil.rmtree(old, ignore_errors=True)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
"""Minimal JSON API for the dashboard (status, services, updates, reset)."""
|
|
def _send(self, code, data):
|
|
body = json.dumps(data).encode()
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def log_message(self, fmt, *args):
|
|
return
|
|
|
|
def do_GET(self):
|
|
if self.path.startswith("/api/status"):
|
|
uptime = float(open("/proc/uptime").read().split()[0])
|
|
load1, load5, load15 = os.getloadavg()
|
|
meminfo = {}
|
|
for ln in open("/proc/meminfo"):
|
|
k, v = ln.split(":", 1)
|
|
meminfo[k] = int(v.strip().split()[0])
|
|
total = meminfo.get("MemTotal", 0)//1024
|
|
free = meminfo.get("MemAvailable", 0)//1024
|
|
disk = shutil.disk_usage("/")
|
|
# CPU temperature (best-effort)
|
|
cpu_temp = None
|
|
for path in ("/sys/class/thermal/thermal_zone0/temp",):
|
|
if pathlib.Path(path).exists():
|
|
try:
|
|
cpu_temp = float(pathlib.Path(path).read_text().strip())/1000.0
|
|
break
|
|
except Exception:
|
|
pass
|
|
# LAN IP (first non-loopback)
|
|
ip_addr = None
|
|
try:
|
|
out = subprocess.check_output(["hostname", "-I"], text=True).strip()
|
|
ip_addr = out.split()[0] if out else None
|
|
except Exception:
|
|
pass
|
|
# OS version
|
|
os_ver = "DietPi"
|
|
try:
|
|
for line in pathlib.Path("/etc/os-release").read_text().splitlines():
|
|
if line.startswith("PRETTY_NAME="):
|
|
os_ver = line.split("=",1)[1].strip().strip('"')
|
|
break
|
|
except Exception:
|
|
pass
|
|
updates_state = auto_updates_state()
|
|
updates_config = read_updates_config(updates_state)
|
|
services = []
|
|
for svc in load_services():
|
|
svc = dict(svc)
|
|
port = svc.get("port")
|
|
if port:
|
|
svc["online"] = port_online("127.0.0.1", port)
|
|
svc["firewall_open"] = ufw_status_allows(port)
|
|
services.append(svc)
|
|
data = {
|
|
"hostname": socket.gethostname(),
|
|
"uptime_seconds": uptime,
|
|
"load": [load1, load5, load15],
|
|
"memory_mb": {"total": total, "free": free},
|
|
"disk_mb": {"total": disk.total//1024//1024, "free": disk.free//1024//1024},
|
|
"cpu_temp_c": cpu_temp,
|
|
"lan_ip": ip_addr,
|
|
"os_version": os_ver,
|
|
"auto_updates_enabled": updates_state.get("enabled", False),
|
|
"auto_updates": updates_state,
|
|
"updates_config": updates_config,
|
|
"reboot_required": reboot_required(),
|
|
"ready": READY_FILE.exists(),
|
|
"services": services
|
|
}
|
|
self._send(200, data)
|
|
elif self.path.startswith("/api/services"):
|
|
services = []
|
|
for svc in load_services():
|
|
svc = dict(svc)
|
|
port = svc.get("port")
|
|
if port:
|
|
svc["online"] = port_online("127.0.0.1", port)
|
|
svc["firewall_open"] = ufw_status_allows(port)
|
|
services.append(svc)
|
|
self._send(200, {"services": services})
|
|
elif self.path.startswith("/api/updates/auto"):
|
|
state = auto_updates_state()
|
|
self._send(200, {"enabled": state.get("enabled", False), "details": state})
|
|
elif self.path.startswith("/api/updates/config"):
|
|
cfg = read_updates_config()
|
|
self._send(200, cfg)
|
|
elif self.path.startswith("/api/update/status"):
|
|
state = load_update_state()
|
|
state["current_version"] = read_current_version()
|
|
self._send(200, state)
|
|
else:
|
|
self._send(404, {"error": "not found"})
|
|
|
|
def do_POST(self):
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
payload = json.loads(self.rfile.read(length) or "{}")
|
|
if self.path.startswith("/api/reset"):
|
|
if payload.get("confirm") == "YES":
|
|
self._send(200, {"message": "Resetting and rebooting..."})
|
|
dbg("Factory reset triggered via API")
|
|
factory_reset()
|
|
else:
|
|
self._send(400, {"error": "type YES to confirm"})
|
|
return
|
|
if self.path.startswith("/api/updates/auto"):
|
|
enable = bool(payload.get("enable"))
|
|
set_auto_updates(enable)
|
|
dbg(f"Auto updates set to {enable}")
|
|
state = auto_updates_state()
|
|
return self._send(200, {"enabled": state.get("enabled", False), "details": state})
|
|
if self.path.startswith("/api/updates/config"):
|
|
try:
|
|
cfg = set_updates_config(payload or {})
|
|
dbg(f"Update settings applied: {cfg}")
|
|
return self._send(200, cfg)
|
|
except Exception as e:
|
|
dbg(f"Failed to apply updates config: {e}")
|
|
return self._send(500, {"error": str(e)})
|
|
if self.path.startswith("/api/update/check"):
|
|
state = check_for_update()
|
|
return self._send(200, state)
|
|
if self.path.startswith("/api/update/apply"):
|
|
# Start background apply to avoid breaking caller during service restart
|
|
start_background_task("apply")
|
|
state = load_update_state()
|
|
state["status"] = "in_progress"
|
|
state["message"] = "Starting background apply"
|
|
save_update_state(state)
|
|
return self._send(202, state)
|
|
if self.path.startswith("/api/update/rollback"):
|
|
start_background_task("rollback")
|
|
state = load_update_state()
|
|
state["status"] = "in_progress"
|
|
state["message"] = "Starting rollback"
|
|
save_update_state(state)
|
|
return self._send(202, state)
|
|
if self.path.startswith("/api/update/auto"):
|
|
state = load_update_state()
|
|
state["auto_check"] = bool(payload.get("enable"))
|
|
save_update_state(state)
|
|
return self._send(200, state)
|
|
if self.path.startswith("/api/services/add"):
|
|
name = payload.get("name")
|
|
port = int(payload.get("port", 0))
|
|
if not name or not port:
|
|
return self._send(400, {"error": "name and port required"})
|
|
if port in CORE_PORTS and name != CORE_NAME:
|
|
return self._send(400, {"error": f"Port {port} is reserved for {CORE_NAME}"})
|
|
services = load_services()
|
|
if any(s.get("port") == port for s in services):
|
|
return self._send(400, {"error": "port already exists"})
|
|
host = socket.gethostname()
|
|
scheme = payload.get("scheme")
|
|
if scheme not in ("http", "https"):
|
|
scheme = "https" if detect_https(host, port) else "http"
|
|
notice = (payload.get("notice") or "").strip()
|
|
notice_link = (payload.get("notice_link") or "").strip()
|
|
self_signed = bool(payload.get("self_signed"))
|
|
path = normalize_path(payload.get("path"))
|
|
svc = {"name": name, "port": port, "scheme": scheme, "url": f"{scheme}://{host}:{port}{path}"}
|
|
if notice:
|
|
svc["notice"] = notice
|
|
if notice_link:
|
|
svc["notice_link"] = notice_link
|
|
if self_signed:
|
|
svc["self_signed"] = True
|
|
if path:
|
|
svc["path"] = path
|
|
services.append(svc)
|
|
save_services(services)
|
|
try:
|
|
allow_port_lan(port)
|
|
except FirewallToolMissing as e:
|
|
return self._send(500, {"error": str(e)})
|
|
return self._send(200, {"services": services, "message": f"Added {name} on port {port} and opened LAN firewall"})
|
|
if self.path.startswith("/api/services/remove"):
|
|
port = int(payload.get("port", 0))
|
|
if not port:
|
|
return self._send(400, {"error": "port required"})
|
|
if port in CORE_PORTS:
|
|
return self._send(400, {"error": f"Cannot remove core service on port {port}"})
|
|
services = [s for s in load_services() if s.get("port") != port]
|
|
try:
|
|
remove_port_lan(port)
|
|
except FirewallToolMissing as e:
|
|
return self._send(500, {"error": str(e)})
|
|
save_services(services)
|
|
return self._send(200, {"services": services, "message": f"Removed service on port {port}"})
|
|
if self.path.startswith("/api/services/update"):
|
|
port = int(payload.get("port", 0))
|
|
new_name = payload.get("name")
|
|
new_port = payload.get("new_port")
|
|
new_scheme = payload.get("scheme")
|
|
notice = payload.get("notice")
|
|
notice_link = payload.get("notice_link")
|
|
new_path = payload.get("path")
|
|
self_signed = payload.get("self_signed")
|
|
services = load_services()
|
|
updated = False
|
|
for svc in services:
|
|
if svc.get("port") == port:
|
|
if new_name:
|
|
# Prevent renaming core service to something else if still on core port
|
|
if port in CORE_PORTS and new_name != CORE_NAME:
|
|
return self._send(400, {"error": f"Core service on port {port} must stay named {CORE_NAME}"})
|
|
svc["name"] = new_name
|
|
target_port = svc.get("port")
|
|
port_changed = False
|
|
if new_port is not None:
|
|
new_port_int = int(new_port)
|
|
if new_port_int != port:
|
|
if new_port_int in CORE_PORTS and svc.get("name") != CORE_NAME:
|
|
return self._send(400, {"error": f"Port {new_port_int} is reserved for {CORE_NAME}"})
|
|
if any(s.get("port") == new_port_int and s is not svc for s in services):
|
|
return self._send(400, {"error": "new port already in use"})
|
|
try:
|
|
remove_port_lan(port)
|
|
allow_port_lan(new_port_int)
|
|
except FirewallToolMissing as e:
|
|
return self._send(500, {"error": str(e)})
|
|
svc["port"] = new_port_int
|
|
target_port = new_port_int
|
|
port_changed = True
|
|
host = socket.gethostname()
|
|
if new_path is not None:
|
|
path = normalize_path(new_path)
|
|
if path:
|
|
svc["path"] = path
|
|
elif "path" in svc:
|
|
svc.pop("path", None)
|
|
else:
|
|
path = normalize_path(svc.get("path"))
|
|
if path:
|
|
svc["path"] = path
|
|
if new_scheme:
|
|
scheme = new_scheme if new_scheme in ("http", "https") else None
|
|
else:
|
|
scheme = svc.get("scheme")
|
|
if not scheme or scheme == "auto":
|
|
scheme = "https" if detect_https(host, target_port) else "http"
|
|
svc["scheme"] = scheme
|
|
svc["url"] = f"{scheme}://{host}:{target_port}{path}"
|
|
if notice is not None:
|
|
text = (notice or "").strip()
|
|
if text:
|
|
svc["notice"] = text
|
|
elif "notice" in svc:
|
|
svc.pop("notice", None)
|
|
if notice_link is not None:
|
|
link = (notice_link or "").strip()
|
|
if link:
|
|
svc["notice_link"] = link
|
|
elif "notice_link" in svc:
|
|
svc.pop("notice_link", None)
|
|
if self_signed is not None:
|
|
if bool(self_signed):
|
|
svc["self_signed"] = True
|
|
else:
|
|
svc.pop("self_signed", None)
|
|
updated = True
|
|
break
|
|
if not updated:
|
|
return self._send(404, {"error": "service not found"})
|
|
save_services(services)
|
|
return self._send(200, {"services": services, "message": "Service updated"})
|
|
self._send(404, {"error": "not found"})
|
|
|
|
|
|
def main():
|
|
load_services()
|
|
server = HTTPServer((HOST, PORT), Handler)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Pi-Kit API / updater")
|
|
parser.add_argument("--apply-update", action="store_true", help="Apply latest release (non-HTTP mode)")
|
|
parser.add_argument("--check-update", action="store_true", help="Check for latest release (non-HTTP mode)")
|
|
parser.add_argument("--rollback-update", action="store_true", help="Rollback to last backup (non-HTTP mode)")
|
|
args = parser.parse_args()
|
|
|
|
if args.apply_update:
|
|
apply_update_stub()
|
|
sys.exit(0)
|
|
if args.check_update:
|
|
check_for_update()
|
|
sys.exit(0)
|
|
if args.rollback_update:
|
|
rollback_update_stub()
|
|
sys.exit(0)
|
|
|
|
main()
|