240 lines
8.5 KiB
Python
240 lines
8.5 KiB
Python
import pathlib
|
|
import re
|
|
import subprocess
|
|
from typing import Any, Dict, Optional
|
|
|
|
from .constants import (
|
|
ALL_PATTERNS,
|
|
APT_AUTO_CFG,
|
|
APT_UA_BASE,
|
|
APT_UA_OVERRIDE,
|
|
DEFAULT_UPDATE_TIME,
|
|
DEFAULT_UPGRADE_TIME,
|
|
SECURITY_PATTERNS,
|
|
)
|
|
from .helpers import strip_comments, validate_time
|
|
|
|
|
|
def auto_updates_enabled() -> bool:
|
|
if not APT_AUTO_CFG.exists():
|
|
return False
|
|
text = APT_AUTO_CFG.read_text()
|
|
return 'APT::Periodic::Unattended-Upgrade "1";' in text
|
|
|
|
|
|
def set_auto_updates(enable: bool) -> None:
|
|
"""
|
|
Toggle unattended upgrades in a way that matches systemd state, not just the
|
|
apt config file. Assumes unattended-upgrades is already installed.
|
|
"""
|
|
units_maskable = [
|
|
"apt-daily.service",
|
|
"apt-daily-upgrade.service",
|
|
"apt-daily.timer",
|
|
"apt-daily-upgrade.timer",
|
|
"unattended-upgrades.service",
|
|
]
|
|
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
|
|
service = "unattended-upgrades.service"
|
|
|
|
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
|
|
if enable:
|
|
APT_AUTO_CFG.write_text(
|
|
'APT::Periodic::Update-Package-Lists "1";\n'
|
|
'APT::Periodic::Unattended-Upgrade "1";\n'
|
|
)
|
|
for unit in units_maskable:
|
|
subprocess.run(["systemctl", "unmask", unit], check=False)
|
|
for unit in timers + [service]:
|
|
subprocess.run(["systemctl", "enable", unit], check=False)
|
|
for unit in timers:
|
|
subprocess.run(["systemctl", "start", unit], check=False)
|
|
subprocess.run(["systemctl", "start", service], check=False)
|
|
else:
|
|
APT_AUTO_CFG.write_text(
|
|
'APT::Periodic::Update-Package-Lists "0";\n'
|
|
'APT::Periodic::Unattended-Upgrade "0";\n'
|
|
)
|
|
for unit in timers + [service]:
|
|
subprocess.run(["systemctl", "stop", unit], check=False)
|
|
subprocess.run(["systemctl", "disable", unit], check=False)
|
|
for unit in units_maskable:
|
|
subprocess.run(["systemctl", "mask", unit], check=False)
|
|
|
|
|
|
def _systemctl_is(unit: str, verb: str) -> bool:
|
|
try:
|
|
out = subprocess.check_output(["systemctl", verb, unit], text=True).strip()
|
|
return out == "enabled" if verb == "is-enabled" else out == "active"
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def auto_updates_state() -> Dict[str, Any]:
|
|
config_on = auto_updates_enabled()
|
|
service = "unattended-upgrades.service"
|
|
timers = ["apt-daily.timer", "apt-daily-upgrade.timer"]
|
|
state: Dict[str, Any] = {
|
|
"config_enabled": config_on,
|
|
"service_enabled": _systemctl_is(service, "is-enabled"),
|
|
"service_active": _systemctl_is(service, "is-active"),
|
|
"timers_enabled": {},
|
|
"timers_active": {},
|
|
}
|
|
for t in timers:
|
|
state["timers_enabled"][t] = _systemctl_is(t, "is-enabled")
|
|
state["timers_active"][t] = _systemctl_is(t, "is-active")
|
|
state["enabled"] = (
|
|
config_on
|
|
and state["service_enabled"]
|
|
and all(state["timers_enabled"].values())
|
|
)
|
|
return state
|
|
|
|
|
|
def _parse_directive(text: str, key: str, default=None, as_bool=False, as_int=False):
|
|
text = strip_comments(text)
|
|
pattern = rf'{re.escape(key)}\s+"?([^";\n]+)"?;'
|
|
m = re.search(pattern, text)
|
|
if not m:
|
|
return default
|
|
val = m.group(1).strip()
|
|
if as_bool:
|
|
return val.lower() in ("1", "true", "yes", "on")
|
|
if as_int:
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return default
|
|
return val
|
|
|
|
|
|
def _parse_origins_patterns(text: str):
|
|
text = strip_comments(text)
|
|
m = re.search(r"Unattended-Upgrade::Origins-Pattern\s*{([^}]*)}", text, re.S)
|
|
patterns = []
|
|
if not m:
|
|
return patterns
|
|
body = m.group(1)
|
|
for line in body.splitlines():
|
|
ln = line.strip().strip('";')
|
|
if ln:
|
|
patterns.append(ln)
|
|
return patterns
|
|
|
|
|
|
def _read_timer_time(timer: str):
|
|
try:
|
|
out = subprocess.check_output(
|
|
["systemctl", "show", "--property=TimersCalendar", timer], text=True
|
|
)
|
|
m = re.search(r"OnCalendar=[^0-9]*([0-9]{1,2}):([0-9]{2})", out)
|
|
if m:
|
|
return f"{int(m.group(1)):02d}:{m.group(2)}"
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def read_updates_config(state=None) -> Dict[str, Any]:
|
|
"""
|
|
Return a normalized unattended-upgrades configuration snapshot.
|
|
Values are sourced from the Pi-Kit override file when present, else the base file.
|
|
"""
|
|
text = ""
|
|
for path in (APT_UA_OVERRIDE, APT_UA_BASE):
|
|
if path.exists():
|
|
try:
|
|
text += path.read_text() + "\n"
|
|
except Exception:
|
|
pass
|
|
scope_hint = None
|
|
m_scope = re.search(r"PIKIT_SCOPE:\s*(\w+)", text)
|
|
if m_scope:
|
|
scope_hint = m_scope.group(1).lower()
|
|
cleaned = strip_comments(text)
|
|
patterns = _parse_origins_patterns(cleaned)
|
|
scope = (
|
|
scope_hint
|
|
or ("all" if any("label=Debian" in p and "-security" not in p for p in patterns) else "security")
|
|
)
|
|
cleanup = _parse_directive(text, "Unattended-Upgrade::Remove-Unused-Dependencies", False, as_bool=True)
|
|
auto_reboot = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot", False, as_bool=True)
|
|
reboot_time = validate_time(_parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-Time", DEFAULT_UPGRADE_TIME), DEFAULT_UPGRADE_TIME)
|
|
reboot_with_users = _parse_directive(text, "Unattended-Upgrade::Automatic-Reboot-WithUsers", False, as_bool=True)
|
|
bandwidth = _parse_directive(text, "Acquire::http::Dl-Limit", None, as_int=True)
|
|
|
|
update_time = _read_timer_time("apt-daily.timer") or DEFAULT_UPDATE_TIME
|
|
upgrade_time = _read_timer_time("apt-daily-upgrade.timer") or DEFAULT_UPGRADE_TIME
|
|
|
|
state = state or auto_updates_state()
|
|
return {
|
|
"enabled": bool(state.get("enabled", False)),
|
|
"scope": scope,
|
|
"cleanup": bool(cleanup),
|
|
"bandwidth_limit_kbps": bandwidth,
|
|
"auto_reboot": bool(auto_reboot),
|
|
"reboot_time": reboot_time,
|
|
"reboot_with_users": bool(reboot_with_users),
|
|
"update_time": update_time,
|
|
"upgrade_time": upgrade_time,
|
|
"state": state,
|
|
}
|
|
|
|
|
|
def _write_timer_override(timer: str, time_str: str):
|
|
time_norm = validate_time(time_str, DEFAULT_UPDATE_TIME)
|
|
override_dir = pathlib.Path(f"/etc/systemd/system/{timer}.d")
|
|
override_dir.mkdir(parents=True, exist_ok=True)
|
|
override_file = override_dir / "pikit.conf"
|
|
override_file.write_text(
|
|
"[Timer]\n"
|
|
f"OnCalendar=*-*-* {time_norm}\n"
|
|
"Persistent=true\n"
|
|
"RandomizedDelaySec=30min\n"
|
|
)
|
|
subprocess.run(["systemctl", "daemon-reload"], check=False)
|
|
subprocess.run(["systemctl", "restart", timer], check=False)
|
|
|
|
|
|
def set_updates_config(opts: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Apply unattended-upgrades configuration from dashboard inputs.
|
|
"""
|
|
enable = bool(opts.get("enable", True))
|
|
scope = opts.get("scope") or "all"
|
|
patterns = ALL_PATTERNS if scope == "all" else SECURITY_PATTERNS
|
|
cleanup = bool(opts.get("cleanup", False))
|
|
bandwidth = opts.get("bandwidth_limit_kbps")
|
|
auto_reboot = bool(opts.get("auto_reboot", False))
|
|
reboot_time = validate_time(opts.get("reboot_time"), DEFAULT_UPGRADE_TIME)
|
|
reboot_with_users = bool(opts.get("reboot_with_users", False))
|
|
update_time = validate_time(opts.get("update_time"), DEFAULT_UPDATE_TIME)
|
|
upgrade_time = validate_time(opts.get("upgrade_time") or opts.get("update_time"), DEFAULT_UPGRADE_TIME)
|
|
|
|
APT_AUTO_CFG.parent.mkdir(parents=True, exist_ok=True)
|
|
set_auto_updates(enable)
|
|
|
|
lines = [
|
|
"// Managed by Pi-Kit dashboard",
|
|
f"// PIKIT_SCOPE: {scope}",
|
|
"Unattended-Upgrade::Origins-Pattern {",
|
|
]
|
|
for p in patterns:
|
|
lines.append(f' "{p}";')
|
|
lines.append("};")
|
|
lines.append(f'Unattended-Upgrade::Remove-Unused-Dependencies {"true" if cleanup else "false"};')
|
|
lines.append(f'Unattended-Upgrade::Automatic-Reboot {"true" if auto_reboot else "false"};')
|
|
lines.append(f'Unattended-Upgrade::Automatic-Reboot-Time "{reboot_time}";')
|
|
lines.append(
|
|
f'Unattended-Upgrade::Automatic-Reboot-WithUsers {"true" if reboot_with_users else "false"};'
|
|
)
|
|
if bandwidth is not None:
|
|
lines.append(f'Acquire::http::Dl-Limit "{int(bandwidth)}";')
|
|
APT_UA_OVERRIDE.parent.mkdir(parents=True, exist_ok=True)
|
|
APT_UA_OVERRIDE.write_text("\n".join(lines) + "\n")
|
|
|
|
_write_timer_override("apt-daily.timer", update_time)
|
|
_write_timer_override("apt-daily-upgrade.timer", upgrade_time)
|
|
return read_updates_config()
|