152 lines
5.8 KiB
Python
152 lines
5.8 KiB
Python
import json
|
|
import pathlib
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
from typing import Any, Dict, List
|
|
|
|
from .constants import (
|
|
CORE_NAME,
|
|
CORE_PORTS,
|
|
HTTPS_PORTS,
|
|
READY_FILE,
|
|
RESET_LOG,
|
|
SERVICE_JSON,
|
|
)
|
|
from .diagnostics import dbg
|
|
from .helpers import default_host, detect_https, ensure_dir, normalize_path
|
|
|
|
|
|
class FirewallToolMissing(Exception):
|
|
"""Raised when ufw is unavailable but a firewall change was requested."""
|
|
pass
|
|
|
|
|
|
def load_services() -> List[Dict[str, Any]]:
|
|
"""Load service registry and normalize url/scheme/path fields."""
|
|
if SERVICE_JSON.exists():
|
|
try:
|
|
data = json.loads(SERVICE_JSON.read_text())
|
|
host = default_host()
|
|
for svc in data:
|
|
svc_path = normalize_path(svc.get("path"))
|
|
if svc_path:
|
|
svc["path"] = svc_path
|
|
if svc.get("port"):
|
|
scheme = svc.get("scheme")
|
|
if not scheme:
|
|
scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http"
|
|
svc["scheme"] = scheme
|
|
svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}"
|
|
return data
|
|
except Exception:
|
|
dbg("Failed to read services.json")
|
|
return []
|
|
return []
|
|
|
|
|
|
def save_services(services: List[Dict[str, Any]]) -> None:
|
|
ensure_dir(SERVICE_JSON.parent)
|
|
SERVICE_JSON.write_text(json.dumps(services, indent=2))
|
|
|
|
|
|
def allow_port_lan(port: int):
|
|
"""Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error."""
|
|
if not shutil.which("ufw"):
|
|
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
|
|
|
|
|
|
def remove_port_lan(port: int):
|
|
"""Close a LAN rule for a port; raise if ufw is missing so callers can surface the error."""
|
|
if not shutil.which("ufw"):
|
|
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
|
|
|
|
|
|
def ufw_status_allows(port: int) -> bool:
|
|
try:
|
|
out = subprocess.check_output(["ufw", "status"], text=True)
|
|
return f"{port}" in out and "ALLOW" in out
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def reset_firewall():
|
|
subprocess.run(["ufw", "--force", "reset"], check=False)
|
|
subprocess.run(["ufw", "default", "deny", "incoming"], check=False)
|
|
subprocess.run(["ufw", "default", "deny", "outgoing"], check=False)
|
|
for port in ("53", "80", "443", "123", "67", "68"):
|
|
subprocess.run(["ufw", "allow", "out", port], check=False)
|
|
subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False)
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
subprocess.run(["ufw", "allow", "out", "to", subnet], check=False)
|
|
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
|
|
for port in ("22", "80", "443", "5252", "5253"):
|
|
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False)
|
|
subprocess.run(["ufw", "--force", "enable"], check=False)
|
|
|
|
|
|
def set_ssh_password_auth(allow: bool):
|
|
"""
|
|
Enable/disable SSH password authentication without requiring the current password.
|
|
Used during factory reset to restore a predictable state.
|
|
"""
|
|
cfg = pathlib.Path("/etc/ssh/sshd_config")
|
|
text = cfg.read_text() if cfg.exists() else ""
|
|
|
|
def set_opt(key, value):
|
|
nonlocal text
|
|
pattern = f"{key} "
|
|
lines = text.splitlines()
|
|
replaced = False
|
|
for idx, line in enumerate(lines):
|
|
if line.strip().startswith(pattern):
|
|
lines[idx] = f"{key} {value}"
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
lines.append(f"{key} {value}")
|
|
text_new = "\n".join(lines) + "\n"
|
|
return text_new
|
|
|
|
text = set_opt("PasswordAuthentication", "yes" if allow else "no")
|
|
text = set_opt("KbdInteractiveAuthentication", "no")
|
|
text = set_opt("ChallengeResponseAuthentication", "no")
|
|
text = set_opt("PubkeyAuthentication", "yes")
|
|
text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password")
|
|
cfg.write_text(text)
|
|
subprocess.run(["systemctl", "restart", "ssh"], check=False)
|
|
return True, f"SSH password auth {'enabled' if allow else 'disabled'}"
|
|
|
|
|
|
def factory_reset():
|
|
# Restore services config
|
|
custom = pathlib.Path("/boot/custom-files/pikit-services.json")
|
|
if custom.exists():
|
|
shutil.copy(custom, SERVICE_JSON)
|
|
else:
|
|
SERVICE_JSON.write_text(
|
|
json.dumps(
|
|
[
|
|
{"name": "Pi-Kit Dashboard", "port": 80},
|
|
{"name": "DietPi Dashboard", "port": 5252},
|
|
],
|
|
indent=2,
|
|
)
|
|
)
|
|
reset_firewall()
|
|
set_ssh_password_auth(True)
|
|
for user in ("root", "dietpi"):
|
|
try:
|
|
subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True)
|
|
except Exception:
|
|
pass
|
|
if not pathlib.Path("/home/dietpi").exists():
|
|
subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False)
|
|
subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False)
|
|
RESET_LOG.write_text("Factory reset triggered\n")
|
|
subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True)
|