Files
pi-kit/pikit_api/auto_updates.py
2025-12-13 17:04:32 -05:00

240 lines
8.5 KiB
Python

import pathlib
import re
import subprocess
from typing import Any, Dict, Optional
from .constants import (
ALL_PATTERNS,
APT_AUTO_CFG,
APT_UA_BASE,
APT_UA_OVERRIDE,
DEFAULT_UPDATE_TIME,
DEFAULT_UPGRADE_TIME,
SECURITY_PATTERNS,
)
from .helpers import strip_comments, validate_time
def auto_updates_enabled() -> bool:
if not APT_AUTO_CFG.exists():
return False
text = APT_AUTO_CFG.read_text()
return 'APT::Periodic::Unattended-Upgrade "1";' in text
def set_auto_updates(enable: bool) -> None:
"""
Toggle unattended upgrades in a way that matches systemd state, not just the
apt config file. Assumes unattended-upgrades is already installed.
"""
units_maskable = [
"apt-daily.service",
"apt-daily-upgrade.service",
"apt-daily.timer",
"apt-daily-upgrade.timer",
"unattended-upgrades.service",
]
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
service = "unattended-upgrades.service"
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
if enable:
APT_AUTO_CFG.write_text(
'APT::Periodic::Update-Package-Lists "1";\n'
'APT::Periodic::Unattended-Upgrade "1";\n'
)
for unit in units_maskable:
subprocess.run(["systemctl", "unmask", unit], check=False)
for unit in timers + [service]:
subprocess.run(["systemctl", "enable", unit], check=False)
for unit in timers:
subprocess.run(["systemctl", "start", unit], check=False)
subprocess.run(["systemctl", "start", service], check=False)
else:
APT_AUTO_CFG.write_text(
'APT::Periodic::Update-Package-Lists "0";\n'
'APT::Periodic::Unattended-Upgrade "0";\n'
)
for unit in timers + [service]:
subprocess.run(["systemctl", "stop", unit], check=False)
subprocess.run(["systemctl", "disable", unit], check=False)
for unit in units_maskable:
subprocess.run(["systemctl", "mask", unit], check=False)
def _systemctl_is(unit: str, verb: str) -> bool:
try:
out = subprocess.check_output(["systemctl", verb, unit], text=True).strip()
return out == "enabled" if verb == "is-enabled" else out == "active"
except Exception:
return False
def auto_updates_state() -> Dict[str, Any]:
config_on = auto_updates_enabled()
service = "unattended-upgrades.service"
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
state: Dict[str, Any] = {
"config_enabled": config_on,
"service_enabled": _systemctl_is(service, "is-enabled"),
"service_active": _systemctl_is(service, "is-active"),
"timers_enabled": {},
"timers_active": {},
}
for t in timers:
state["timers_enabled"][t] = _systemctl_is(t, "is-enabled")
state["timers_active"][t] = _systemctl_is(t, "is-active")
state["enabled"] = (
config_on
and state["service_enabled"]
and all(state["timers_enabled"].values())
)
return state
def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False):
text = strip_comments(text)
pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;'
m = re.search(pattern, text)
if not m:
return default
val = m.group(1).strip()
if as_bool:
return val.lower() in ("1", "true", "yes", "on")
if as_int:
try:
return int(val)
except ValueError:
return default
return val
def _parse_origins_patterns(text: str):
text = strip_comments(text)
m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S)
patterns = []
if not m:
return patterns
body = m.group(1)
for line in body.splitlines():
ln = line.strip().strip('";')
if ln:
patterns.append(ln)
return patterns
def _read_timer_time(timer: str):
try:
out = subprocess.check_output(
["systemctl", "show", "--property=TimersCalendar", timer], text=True
)
m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out)
if m:
return f"{int(m.group(1)):02d}:{m.group(2)}"
except Exception:
pass
return None
def read_updates_config(state=None) -> Dict[str, Any]:
"""
Return a normalized unattended-upgrades configuration snapshot.
Values are sourced from the Pi-Kit override file when present, else the base file.
"""
text = ""
for path in (APT_UA_OVERRIDE, APT_UA_BASE):
if path.exists():
try:
text += path.read_text() + "\n"
except Exception:
pass
scope_hint = None
m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text)
if m_scope:
scope_hint = m_scope.group(1).lower()
cleaned = strip_comments(text)
patterns = _parse_origins_patterns(cleaned)
scope = (
scope_hint
or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security")
)
cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True)
auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True)
reboot_time = validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME)
reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True)
bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True)
update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME
upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME
state = state or auto_updates_state()
return {
"enabled": bool(state.get("enabled", False)),
"scope": scope,
"cleanup": bool(cleanup),
"bandwidth_limit_kbps": bandwidth,
"auto_reboot": bool(auto_reboot),
"reboot_time": reboot_time,
"reboot_with_users": bool(reboot_with_users),
"update_time": update_time,
"upgrade_time": upgrade_time,
"state": state,
}
def _write_timer_override(timer: str, time_str: str):
time_norm = validate_time(time_str, DEFAULT_UPDATE_TIME)
override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d")
override_dir.mkdir(parents=True, exist_ok=True)
override_file = override_dir / "pikit.conf"
override_file.write_text(
"[Timer]\n"
f"OnCalendar=*-*-* {time_norm}\n"
"Persistent=true\n"
"RandomizedDelaySec=30min\n"
)
subprocess.run(["systemctl", "daemon-reload"], check=False)
subprocess.run(["systemctl", "restart", timer], check=False)
def set_updates_config(opts: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply unattended-upgrades configuration from dashboard inputs.
"""
enable = bool(opts.get("enable", True))
scope = opts.get("scope") or "all"
patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS
cleanup = bool(opts.get("cleanup", False))
bandwidth = opts.get("bandwidth_limit_kbps")
auto_reboot = bool(opts.get("auto_reboot", False))
reboot_time = validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME)
reboot_with_users = bool(opts.get("reboot_with_users", False))
update_time = validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME)
upgrade_time = validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME)
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
set_auto_updates(enable)
lines = [
"// Managed by Pi-Kit dashboard",
f"// PIKIT_SCOPE: {scope}",
"Unattended-Upgrade::Origins-Pattern {",
]
for p in patterns:
lines.append(f' "{p}";')
lines.append("};")
lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};')
lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};')
lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";')
lines.append(
f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};'
)
if bandwidth is not None:
lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";')
APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True)
APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n")
_write_timer_override("apt-daily.timer", update_time)
_write_timer_override("apt-daily-upgrade.timer", upgrade_time)
return read_updates_config()