import json import pathlib import shutil import socket import subprocess from typing import Any, Dict, List from .constants import ( CORE_NAME, CORE_PORTS, HTTPS_PORTS, READY_FILE, RESET_LOG, SERVICE_JSON, ) from .diagnostics import dbg from .helpers import default_host, detect_https, ensure_dir, normalize_path class FirewallToolMissing(Exception): """Raised when ufw is unavailable but a firewall change was requested.""" pass def load_services() -> List[Dict[str, Any]]: """Load service registry and normalize url/scheme/path fields.""" if SERVICE_JSON.exists(): try: data = json.loads(SERVICE_JSON.read_text()) host = default_host() for svc in data: svc_path = normalize_path(svc.get("path")) if svc_path: svc["path"] = svc_path if svc.get("port"): scheme = svc.get("scheme") if not scheme: scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http" svc["scheme"] = scheme svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}" return data except Exception: dbg("Failed to read services.json") return [] return [] def save_services(services: List[Dict[str, Any]]) -> None: ensure_dir(SERVICE_JSON.parent) SERVICE_JSON.write_text(json.dumps(services, indent=2)) def allow_port_lan(port: int): """Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error.""" if not shutil.which("ufw"): raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.") for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False) def remove_port_lan(port: int): """Close a LAN rule for a port; raise if ufw is missing so callers can surface the error.""" if not shutil.which("ufw"): raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.") for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False) def ufw_status_allows(port: int) -> bool: try: out = subprocess.check_output(["ufw", "status"], text=True) return f"{port}" in out and "ALLOW" in out except Exception: return False def reset_firewall(): subprocess.run(["ufw", "--force", "reset"], check=False) subprocess.run(["ufw", "default", "deny", "incoming"], check=False) subprocess.run(["ufw", "default", "deny", "outgoing"], check=False) for port in ("53", "80", "443", "123", "67", "68"): subprocess.run(["ufw", "allow", "out", port], check=False) subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False) for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): subprocess.run(["ufw", "allow", "out", "to", subnet], check=False) for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"): for port in ("22", "80", "443", "5252", "5253"): subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False) subprocess.run(["ufw", "--force", "enable"], check=False) def set_ssh_password_auth(allow: bool): """ Enable/disable SSH password authentication without requiring the current password. Used during factory reset to restore a predictable state. """ cfg = pathlib.Path("/etc/ssh/sshd_config") text = cfg.read_text() if cfg.exists() else "" def set_opt(key, value): nonlocal text pattern = f"{key} " lines = text.splitlines() replaced = False for idx, line in enumerate(lines): if line.strip().startswith(pattern): lines[idx] = f"{key} {value}" replaced = True break if not replaced: lines.append(f"{key} {value}") text_new = "\n".join(lines) + "\n" return text_new text = set_opt("PasswordAuthentication", "yes" if allow else "no") text = set_opt("KbdInteractiveAuthentication", "no") text = set_opt("ChallengeResponseAuthentication", "no") text = set_opt("PubkeyAuthentication", "yes") text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password") cfg.write_text(text) subprocess.run(["systemctl", "restart", "ssh"], check=False) return True, f"SSH password auth {'enabled' if allow else 'disabled'}" def factory_reset(): # Restore services config custom = pathlib.Path("/boot/custom-files/pikit-services.json") if custom.exists(): shutil.copy(custom, SERVICE_JSON) else: SERVICE_JSON.write_text( json.dumps( [ {"name": "Pi-Kit Dashboard", "port": 80}, {"name": "DietPi Dashboard", "port": 5252}, ], indent=2, ) ) reset_firewall() set_ssh_password_auth(True) for user in ("root", "dietpi"): try: subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True) except Exception: pass if not pathlib.Path("/home/dietpi").exists(): subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False) subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False) RESET_LOG.write_text("Factory reset triggered\n") subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True)