Files
pi-kit/pikit_api/services.py
2025-12-13 17:04:32 -05:00

152 lines
5.8 KiB
Python

import json
import pathlib
import shutil
import socket
import subprocess
from typing import Any, Dict, List
from .constants import (
CORE_NAME,
CORE_PORTS,
HTTPS_PORTS,
READY_FILE,
RESET_LOG,
SERVICE_JSON,
)
from .diagnostics import dbg
from .helpers import default_host, detect_https, ensure_dir, normalize_path
class FirewallToolMissing(Exception):
"""Raised when ufw is unavailable but a firewall change was requested."""
pass
def load_services() -> List[Dict[str, Any]]:
"""Load service registry and normalize url/scheme/path fields."""
if SERVICE_JSON.exists():
try:
data = json.loads(SERVICE_JSON.read_text())
host = default_host()
for svc in data:
svc_path = normalize_path(svc.get("path"))
if svc_path:
svc["path"] = svc_path
if svc.get("port"):
scheme = svc.get("scheme")
if not scheme:
scheme = "https" if int(svc["port"]) in HTTPS_PORTS else "http"
svc["scheme"] = scheme
svc["url"] = f"{scheme}://{host}:{svc['port']}{svc_path}"
return data
except Exception:
dbg("Failed to read services.json")
return []
return []
def save_services(services: List[Dict[str, Any]]) -> None:
ensure_dir(SERVICE_JSON.parent)
SERVICE_JSON.write_text(json.dumps(services, indent=2))
def allow_port_lan(port: int):
"""Open a port to RFC1918 subnets; raise if ufw is missing so callers can surface the error."""
if not shutil.which("ufw"):
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
def remove_port_lan(port: int):
"""Close a LAN rule for a port; raise if ufw is missing so callers can surface the error."""
if not shutil.which("ufw"):
raise FirewallToolMissing("Cannot update firewall: ufw is not installed on this system.")
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "delete", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
def ufw_status_allows(port: int) -> bool:
try:
out = subprocess.check_output(["ufw", "status"], text=True)
return f"{port}" in out and "ALLOW" in out
except Exception:
return False
def reset_firewall():
subprocess.run(["ufw", "--force", "reset"], check=False)
subprocess.run(["ufw", "default", "deny", "incoming"], check=False)
subprocess.run(["ufw", "default", "deny", "outgoing"], check=False)
for port in ("53", "80", "443", "123", "67", "68"):
subprocess.run(["ufw", "allow", "out", port], check=False)
subprocess.run(["ufw", "allow", "out", "on", "lo"], check=False)
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "out", "to", subnet], check=False)
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
for port in ("22", "80", "443", "5252", "5253"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", port], check=False)
subprocess.run(["ufw", "--force", "enable"], check=False)
def set_ssh_password_auth(allow: bool):
"""
Enable/disable SSH password authentication without requiring the current password.
Used during factory reset to restore a predictable state.
"""
cfg = pathlib.Path("/etc/ssh/sshd_config")
text = cfg.read_text() if cfg.exists() else ""
def set_opt(key, value):
nonlocal text
pattern = f"{key} "
lines = text.splitlines()
replaced = False
for idx, line in enumerate(lines):
if line.strip().startswith(pattern):
lines[idx] = f"{key} {value}"
replaced = True
break
if not replaced:
lines.append(f"{key} {value}")
text_new = "\n".join(lines) + "\n"
return text_new
text = set_opt("PasswordAuthentication", "yes" if allow else "no")
text = set_opt("KbdInteractiveAuthentication", "no")
text = set_opt("ChallengeResponseAuthentication", "no")
text = set_opt("PubkeyAuthentication", "yes")
text = set_opt("PermitRootLogin", "yes" if allow else "prohibit-password")
cfg.write_text(text)
subprocess.run(["systemctl", "restart", "ssh"], check=False)
return True, f"SSH password auth {'enabled' if allow else 'disabled'}"
def factory_reset():
# Restore services config
custom = pathlib.Path("/boot/custom-files/pikit-services.json")
if custom.exists():
shutil.copy(custom, SERVICE_JSON)
else:
SERVICE_JSON.write_text(
json.dumps(
[
{"name": "Pi-Kit Dashboard", "port": 80},
{"name": "DietPi Dashboard", "port": 5252},
],
indent=2,
)
)
reset_firewall()
set_ssh_password_auth(True)
for user in ("root", "dietpi"):
try:
subprocess.run(["chpasswd"], input=f"{user}:pikit".encode(), check=True)
except Exception:
pass
if not pathlib.Path("/home/dietpi").exists():
subprocess.run(["useradd", "-m", "-s", "/bin/bash", "dietpi"], check=False)
subprocess.run(["chpasswd"], input=b"dietpi:pikit", check=False)
RESET_LOG.write_text("Factory reset triggered\n")
subprocess.Popen(["/bin/sh", "-c", "sleep 2 && systemctl reboot >/dev/null 2>&1"], close_fds=True)