use NestedDeserializableDataclass for config

This commit is contained in:
Alex 2025-01-07 01:43:39 +03:00
parent 96664684f8
commit 3eb358d618
13 changed files with 188 additions and 130 deletions

View file

@ -3,6 +3,7 @@ import traceback
from datetime import timedelta
from alerting import alerts
from alerting.enum import AlertType, Severity
from misc import cvars, docker_registry, sensors, vuln
from misc.disks import LVAttr, WearoutIndicator, get_wearout_reading
@ -18,15 +19,15 @@ def temp_check() -> list[alerts.Alert]:
continue # little valuable info and too low limits there, might as well ignore it
if sensor.critical_temp is not None and (IS_TESTING or sensor.current_temp > sensor.critical_temp):
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
alert_type=AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
elif sensor.highest_temp is not None and (IS_TESTING or sensor.current_temp > sensor.highest_temp):
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
alert_type=AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.highest_temp}°C",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
else:
continue
@ -38,15 +39,15 @@ def cpu_check() -> list[alerts.Alert]:
sensor = sensors.Sensors.get_cpu()
if IS_TESTING or sensor.current_load > sensor.critical_load:
alert = alerts.Alert(
alert_type=alerts.AlertType("CPU"),
alert_type=AlertType("CPU"),
message=f"{sensor.current_load}% > {sensor.critical_load}%",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
elif IS_TESTING or sensor.current_load > sensor.highest_load:
alert = alerts.Alert(
alert_type=alerts.AlertType("CPU"),
alert_type=AlertType("CPU"),
message=f"{sensor.current_load}% > {sensor.highest_load}%",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
else:
return []
@ -57,15 +58,15 @@ def ram_check() -> list[alerts.Alert]:
sensor = sensors.Sensors.get_ram()
if IS_TESTING or sensor.current_avail < sensor.critical_avail:
alert = alerts.Alert(
alert_type=alerts.AlertType("RAM"),
alert_type=AlertType("RAM"),
message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.critical_avail / 1024**3):.2f} GiB",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
elif IS_TESTING or sensor.current_avail < sensor.warning_avail:
alert = alerts.Alert(
alert_type=alerts.AlertType("RAM"),
alert_type=AlertType("RAM"),
message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.warning_avail / 1024**3):.2f} GiB",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
else:
return []
@ -79,18 +80,18 @@ async def vuln_check() -> list[alerts.Alert]:
if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL):
match v.severity:
case vuln.Severity.LOW:
severity = alerts.Severity.INFO
severity = Severity.INFO
case vuln.Severity.MEDIUM:
severity = alerts.Severity.WARNING
severity = Severity.WARNING
case vuln.Severity.HIGH | vuln.Severity.CRITICAL:
severity = alerts.Severity.CRITICAL
severity = Severity.CRITICAL
message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
html_message = f"<a href='{v.link}'>{v.id}</a>: {v.vuln_type} in {','.join(v.packages)}"
if v.fixed:
message.append(f" -- update to {v.fixed} ASAP")
html_message.append(f" -- update to {v.fixed} ASAP")
alert = alerts.Alert(
alert_type=alerts.AlertType.VULN,
alert_type=AlertType.VULN,
message=message,
html_message=html_message,
severity=severity,
@ -110,49 +111,47 @@ async def ups_check() -> list[alerts.Alert]:
if IS_TESTING or sensor.battery_charge_percentage < sensor.battery_critical_percentage:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
alert_type=AlertType.UPS,
message=f"Battery is under {sensor.battery_critical_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
)
elif IS_TESTING or sensor.battery_charge_percentage < sensor.battery_warning_percentage:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
alert_type=AlertType.UPS,
message=f"Battery is under {sensor.battery_warning_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
)
for status in sensor.ups_status:
if IS_TESTING or status == sensors.UPSStatus.UPS_OVERLOAD:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS, message=f"UPS is overloaded!", severity=alerts.Severity.CRITICAL
)
alerts.Alert(alert_type=AlertType.UPS, message=f"UPS is overloaded!", severity=Severity.CRITICAL)
)
elif IS_TESTING or status == sensors.UPSStatus.ON_BATTERY:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
alert_type=AlertType.UPS,
message=f"UPS is on battery.\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.INFO,
severity=Severity.INFO,
)
)
elif IS_TESTING or status == sensors.UPSStatus.UPS_TRIM:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
alert_type=AlertType.UPS,
message=f"Overvoltage detected: trimming voltage to nominal.",
severity=alerts.Severity.INFO,
severity=Severity.INFO,
)
)
elif IS_TESTING or status == sensors.UPSStatus.UPS_BOOST:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
alert_type=AlertType.UPS,
message=f"Undervoltage detected: boosting voltage to nominal.",
severity=alerts.Severity.INFO,
severity=Severity.INFO,
)
)
@ -165,26 +164,26 @@ async def docker_registry_check() -> list[alerts.Alert]:
for image in updated_images:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPDATE,
alert_type=AlertType.UPDATE,
message=f"{image} docker image: new version available",
severity=alerts.Severity.INFO,
severity=Severity.INFO,
)
)
return alert_list
def raid_check() -> list[alerts.Alert]:
check_config = cvars.config.get()["checks"]["raid"]
check_config = cvars.config.get().checks.raid
alert_list = []
for lv in check_config["lvs"]:
for lv in check_config.lvs:
try:
lv_attr = LVAttr.from_cli(lv)
except Exception as exc:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
alert_type=AlertType.ERROR,
message=f"Could not check RAID LV {lv}: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
@ -194,9 +193,9 @@ def raid_check() -> list[alerts.Alert]:
if lv_attr.vol_type not in [LVAttr.VolType.RAID, LVAttr.VolType.RAID_NOSYNC]:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
alert_type=AlertType.ERROR,
message=f"LV {lv} is not of RAID type",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
)
continue
@ -204,9 +203,9 @@ def raid_check() -> list[alerts.Alert]:
if IS_TESTING:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.RAID,
alert_type=AlertType.RAID,
message=f"Test alert: LV {lv} health is {lv_attr.health}",
severity=alerts.Severity.INFO,
severity=Severity.INFO,
)
)
@ -214,33 +213,33 @@ def raid_check() -> list[alerts.Alert]:
case LVAttr.Health.PARTIAL:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.RAID,
alert_type=AlertType.RAID,
message=f"LV {lv} operating in partial mode; one of PVs has failed",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
)
case LVAttr.Health.UNKNOWN:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.RAID,
alert_type=AlertType.RAID,
message=f"LV {lv}'s state is unknown",
severity=alerts.Severity.CRITICAL,
severity=Severity.CRITICAL,
)
)
case LVAttr.Health.REFRESH_NEEDED:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.RAID,
alert_type=AlertType.RAID,
message=f"LV {lv} has suffered a write error; run a refresh or replace the failing PV",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
)
case LVAttr.Health.MISMATCHES:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.RAID,
alert_type=AlertType.RAID,
message=f"LV {lv} is partially incoherent; run a repairing scrub operation",
severity=alerts.Severity.WARNING,
severity=Severity.WARNING,
)
)
@ -248,17 +247,17 @@ def raid_check() -> list[alerts.Alert]:
def disk_wearout_check() -> list[alerts.Alert]:
check_config = cvars.config.get()["checks"]["wearout"]
check_config = cvars.config.get().checks.wearout
alert_list = []
for disk in check_config["disks"]:
for disk in check_config.disks:
try:
wearout_reading = get_wearout_reading(disk["name"])
wearout_reading = get_wearout_reading(disk.name)
except Exception as exc:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message=f"Could not check wearout for disk {disk['name']}: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
alert_type=AlertType.ERROR,
message=f"Could not check wearout for disk {disk.name}: {repr(exc)}, see logs",
severity=Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
@ -267,13 +266,11 @@ def disk_wearout_check() -> list[alerts.Alert]:
if IS_TESTING or wearout_reading.current_reading < wearout_reading.threshold_reading:
match wearout_reading.indicator:
case WearoutIndicator.REALLOCATED_SECTORS:
message = f"Disk {disk['name']} has reallocated sectors (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})"
message = f"Disk {disk.name} has reallocated sectors (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})"
case WearoutIndicator.SPARE_BLOCKS:
message = f"Disk {disk['name']} has too few spare blocks (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})"
message = f"Disk {disk.name} has too few spare blocks (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})"
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.DISKS, message=message, severity=alerts.Severity[disk["severity"]]
)
alerts.Alert(alert_type=AlertType.DISKS, message=message, severity=Severity[disk.severity])
)
return alert_list