Add dns-stack profile and stable IP prompt

This commit is contained in:
Aaron
2026-01-03 17:17:27 -05:00
parent a67b1a55d4
commit 1ddffee077
6 changed files with 369 additions and 29 deletions

View File

@@ -16,6 +16,7 @@ PROFILE_FILE="/etc/pikit/profile.json"
MOTD_FILE="/etc/motd"
FIRSTBOOT_CONF="/etc/pikit/firstboot.conf"
APT_UA_OVERRIDE="/etc/apt/apt.conf.d/51pikit-unattended.conf"
SERVICE_JSON="/etc/pikit/services.json"
STEPS=(
"Preparing system"
@@ -58,7 +59,7 @@ configure_unattended_defaults() {
log "python3 missing; skipping unattended-upgrades defaults."
return
fi
PYTHONPATH=/usr/local/bin python3 - <<'PY'
PROFILE_FILE="$PROFILE_FILE" SERVICE_JSON="$SERVICE_JSON" PYTHONPATH=/usr/local/bin python3 - <<'PY'
import sys
try:
from pikit_api.auto_updates import set_updates_config
@@ -71,6 +72,198 @@ PY
log "Unattended-upgrades defaults applied (security-only)."
}
apply_profile() {
if [ ! -f "$PROFILE_FILE" ]; then
log "Profile step skipped (no profile.json)."
return
fi
PYTHONPATH=/usr/local/bin python3 - <<'PY'
import json
import os
import pathlib
import pwd
import grp
import shutil
import subprocess
profile_path = pathlib.Path(os.environ.get("PROFILE_FILE", "/etc/pikit/profile.json"))
services_path = pathlib.Path(os.environ.get("SERVICE_JSON", "/etc/pikit/services.json"))
def log(msg: str) -> None:
print(msg)
def ipv6_enabled() -> bool:
cfg = pathlib.Path("/etc/default/ufw")
if not cfg.exists():
return True
for line in cfg.read_text().splitlines():
if line.strip().startswith("IPV6="):
return line.split("=", 1)[1].strip().lower() == "yes"
return True
def get_ipv6_prefixes() -> list:
if not ipv6_enabled():
return []
try:
out = subprocess.check_output(["ip", "-6", "addr", "show", "scope", "global"], text=True)
except Exception:
return []
prefixes = set()
for line in out.splitlines():
line = line.strip()
if not line.startswith("inet6 "):
continue
if " temporary " in f" {line} ":
continue
parts = line.split()
if len(parts) < 2:
continue
prefixes.add(parts[1])
return sorted(prefixes)
try:
profile = json.loads(profile_path.read_text())
except Exception as e:
log(f"Profile load failed: {e}")
profile = {}
ports = profile.get("firewall_ports") or []
firewall_enable = bool(profile.get("firewall_enable", False))
base_ports = profile.get("firewall_base_ports") or [22, 80, 443, 5252, 5253]
if ports:
if shutil.which("ufw"):
ipv6_prefixes = get_ipv6_prefixes()
if ipv6_prefixes:
log(f"IPv6 LAN prefixes: {', '.join(ipv6_prefixes)}")
for raw in ports:
try:
port = int(raw)
except Exception:
continue
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
for prefix in ipv6_prefixes:
subprocess.run(["ufw", "allow", "from", prefix, "to", "any", "port", str(port)], check=False)
if firewall_enable:
for raw in base_ports:
try:
port = int(raw)
except Exception:
continue
for subnet in ("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "100.64.0.0/10", "169.254.0.0/16"):
subprocess.run(["ufw", "allow", "from", subnet, "to", "any", "port", str(port)], check=False)
for prefix in ipv6_prefixes:
subprocess.run(["ufw", "allow", "from", prefix, "to", "any", "port", str(port)], check=False)
subprocess.run(["ufw", "--force", "enable"], check=False)
log("UFW enabled.")
log("Profile firewall rules applied.")
else:
log("Profile firewall step skipped (ufw missing).")
else:
log("Profile firewall step skipped (no ports).")
def normalize_name(name: str) -> str:
return name.strip().lower()
services = []
if services_path.exists():
try:
services = json.loads(services_path.read_text())
except Exception:
services = []
if not isinstance(services, list):
services = []
profile_services = profile.get("services") or []
if isinstance(profile_services, list) and profile_services:
for psvc in profile_services:
if not isinstance(psvc, dict):
continue
p_name = normalize_name(str(psvc.get("name", "")))
p_port = str(psvc.get("port", ""))
p_path = str(psvc.get("path", "")) if psvc.get("path") is not None else ""
replaced = False
for svc in services:
if not isinstance(svc, dict):
continue
s_name = normalize_name(str(svc.get("name", "")))
s_port = str(svc.get("port", ""))
s_path = str(svc.get("path", "")) if svc.get("path") is not None else ""
if (p_name and s_name == p_name) or (p_port and s_port == p_port and s_path == p_path):
svc.update(psvc)
replaced = True
break
if not replaced:
services.append(psvc)
services_path.parent.mkdir(parents=True, exist_ok=True)
services_path.write_text(json.dumps(services, indent=2))
log("Profile services merged.")
else:
log("Profile services step skipped (none).")
actions = profile.get("actions") or []
if isinstance(actions, list) and actions:
for action in actions:
if not isinstance(action, dict):
continue
action_type = action.get("type")
if action_type == "tls_bundle":
src_cert = pathlib.Path(action.get("source_cert", ""))
src_key = pathlib.Path(action.get("source_key", ""))
dest = pathlib.Path(action.get("dest", ""))
if not src_cert.exists() or not src_key.exists():
log(f"TLS bundle skipped (missing cert/key): {dest}")
continue
dest.parent.mkdir(parents=True, exist_ok=True)
content = src_cert.read_bytes() + b\"\\n\" + src_key.read_bytes() + b\"\\n\"
dest.write_bytes(content)
owner = action.get("owner")
if owner:
user, _, group = str(owner).partition(\":\")
try:
uid = pwd.getpwnam(user).pw_uid if user else -1
except Exception:
uid = -1
try:
gid = grp.getgrnam(group).gr_gid if group else -1
except Exception:
gid = -1
if uid != -1 or gid != -1:
os.chown(dest, uid if uid != -1 else -1, gid if gid != -1 else -1)
mode = action.get("mode")
if mode:
try:
os.chmod(dest, int(str(mode), 8))
except Exception:
pass
restart = action.get("restart")
if restart:
subprocess.run([\"systemctl\", \"restart\", str(restart)], check=False)
log(f\"TLS bundle written: {dest}\")
continue
if action_type == "replace_text":
file_path = pathlib.Path(action.get("file", ""))
match = str(action.get("match", ""))
replacement = str(action.get("replace", ""))
if not file_path.exists():
log(f\"Replace skipped (missing file): {file_path}\")
continue
content = file_path.read_text()
if match not in content:
log(f\"Replace skipped (pattern not found): {file_path}\")
continue
file_path.write_text(content.replace(match, replacement, 1))
restart = action.get("restart")
if restart:
subprocess.run([\"systemctl\", \"restart\", str(restart)], check=False)
log(f\"Replaced text in: {file_path}\")
continue
else:
log("Profile actions step skipped (none).")
PY
}
write_state() {
local state="$1"
local current="$2"
@@ -260,34 +453,7 @@ finish_step 3
begin_step 4
configure_unattended_defaults
if [ -f "$PROFILE_FILE" ] && command -v ufw >/dev/null 2>&1; then
python3 - <<'PY' > /tmp/pikit-profile-ports.txt
import json
import sys
from pathlib import Path
profile = Path("/etc/pikit/profile.json")
try:
data = json.loads(profile.read_text())
except Exception:
data = {}
ports = data.get("firewall_ports") or []
for port in ports:
try:
port_int = int(port)
except Exception:
continue
print(port_int)
PY
while read -r port; do
[ -z "$port" ] && continue
for subnet in 10.0.0.0/8 172.16.0.0/12 192.168.0.0/16 100.64.0.0/10 169.254.0.0/16; do
ufw allow from "$subnet" to any port "$port" || true
done
done < /tmp/pikit-profile-ports.txt
rm -f /tmp/pikit-profile-ports.txt
else
log "Profile firewall step skipped (no profile.json or ufw missing)"
fi
apply_profile
if [ ! -f "$WEB_ASSETS/pikit-ca.crt" ]; then
echo "CA bundle missing in web assets" >&2