import logging import traceback from datetime import timedelta from alerting import alerts from misc import cvars, docker_registry, sensors, vuln from misc.disks import LVAttr, WearoutIndicator, get_wearout_reading IS_TESTING = False def temp_check() -> list[alerts.Alert]: alert_list = [] temps = sensors.Sensors.get_temperatures() for _, sensor_list in temps.items(): for sensor in sensor_list: if sensor.sensor_type == "nct6687": continue # little valuable info and too low limits there, might as well ignore it if sensor.critical_temp is not None and (IS_TESTING or sensor.current_temp > sensor.critical_temp): alert = alerts.Alert( alert_type=alerts.AlertType("TEMP"), message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C", severity=alerts.Severity.CRITICAL, ) elif sensor.highest_temp is not None and (IS_TESTING or sensor.current_temp > sensor.highest_temp): alert = alerts.Alert( alert_type=alerts.AlertType("TEMP"), message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.highest_temp}°C", severity=alerts.Severity.WARNING, ) else: continue alert_list.append(alert) return alert_list def cpu_check() -> list[alerts.Alert]: sensor = sensors.Sensors.get_cpu() if IS_TESTING or sensor.current_load > sensor.critical_load: alert = alerts.Alert( alert_type=alerts.AlertType("CPU"), message=f"{sensor.current_load}% > {sensor.critical_load}%", severity=alerts.Severity.CRITICAL, ) elif IS_TESTING or sensor.current_load > sensor.highest_load: alert = alerts.Alert( alert_type=alerts.AlertType("CPU"), message=f"{sensor.current_load}% > {sensor.highest_load}%", severity=alerts.Severity.WARNING, ) else: return [] return [alert] def ram_check() -> list[alerts.Alert]: sensor = sensors.Sensors.get_ram() if IS_TESTING or sensor.current_avail < sensor.critical_avail: alert = alerts.Alert( alert_type=alerts.AlertType("RAM"), message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.critical_avail / 1024**3):.2f} GiB", severity=alerts.Severity.CRITICAL, ) elif IS_TESTING or sensor.current_avail < sensor.warning_avail: alert = alerts.Alert( alert_type=alerts.AlertType("RAM"), message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.warning_avail / 1024**3):.2f} GiB", severity=alerts.Severity.WARNING, ) else: return [] return [alert] async def vuln_check() -> list[alerts.Alert]: vulns = await vuln.get_vulns() alert_list = [] for v in vulns: if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL): match v.severity: case vuln.Severity.LOW: severity = alerts.Severity.INFO case vuln.Severity.MEDIUM: severity = alerts.Severity.WARNING case vuln.Severity.HIGH | vuln.Severity.CRITICAL: severity = alerts.Severity.CRITICAL message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" html_message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" if v.fixed: message.append(f" -- update to {v.fixed} ASAP") html_message.append(f" -- update to {v.fixed} ASAP") alert = alerts.Alert( alert_type=alerts.AlertType.VULN, message=message, html_message=html_message, severity=severity, ) alert_list.append(alert) return alert_list async def ups_check() -> list[alerts.Alert]: sensor = await sensors.Sensors.get_ups() if not sensor: return alert_list = [] if IS_TESTING or sensor.battery_charge_percentage < sensor.battery_critical_percentage: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"Battery is under {sensor.battery_critical_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=alerts.Severity.CRITICAL, ) ) elif IS_TESTING or sensor.battery_charge_percentage < sensor.battery_warning_percentage: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"Battery is under {sensor.battery_warning_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=alerts.Severity.WARNING, ) ) for status in sensor.ups_status: if IS_TESTING or status == sensors.UPSStatus.UPS_OVERLOAD: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"UPS is overloaded!", severity=alerts.Severity.CRITICAL ) ) elif IS_TESTING or status == sensors.UPSStatus.ON_BATTERY: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"UPS is on battery.\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=alerts.Severity.INFO, ) ) elif IS_TESTING or status == sensors.UPSStatus.UPS_TRIM: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"Overvoltage detected: trimming voltage to nominal.", severity=alerts.Severity.INFO, ) ) elif IS_TESTING or status == sensors.UPSStatus.UPS_BOOST: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPS, message=f"Undervoltage detected: boosting voltage to nominal.", severity=alerts.Severity.INFO, ) ) return alert_list async def docker_registry_check() -> list[alerts.Alert]: updated_images = await docker_registry.get_updated_images() alert_list = [] for image in updated_images: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.UPDATE, message=f"{image} docker image: new version available", severity=alerts.Severity.INFO, ) ) return alert_list def raid_check() -> list[alerts.Alert]: check_config = cvars.config.get()["checks"]["raid"] alert_list = [] for lv in check_config["lvs"]: try: lv_attr = LVAttr.from_cli(lv) except Exception as exc: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.ERROR, message=f"Could not check RAID LV {lv}: {repr(exc)}, see logs", severity=alerts.Severity.CRITICAL, ) ) logging.error(traceback.format_exc()) continue # sanity check if lv_attr.vol_type not in [LVAttr.VolType.RAID, LVAttr.VolType.RAID_NOSYNC]: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.ERROR, message=f"LV {lv} is not of RAID type", severity=alerts.Severity.CRITICAL, ) ) continue if IS_TESTING: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.RAID, message=f"Test alert: LV {lv} health is {lv_attr.health}", severity=alerts.Severity.INFO, ) ) match lv_attr.health: case LVAttr.Health.PARTIAL: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.RAID, message=f"LV {lv} operating in partial mode; one of PVs has failed", severity=alerts.Severity.CRITICAL, ) ) case LVAttr.Health.UNKNOWN: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.RAID, message=f"LV {lv}'s state is unknown", severity=alerts.Severity.CRITICAL, ) ) case LVAttr.Health.REFRESH_NEEDED: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.RAID, message=f"LV {lv} has suffered a write error; run a refresh or replace the failing PV", severity=alerts.Severity.WARNING, ) ) case LVAttr.Health.MISMATCHES: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.RAID, message=f"LV {lv} is partially incoherent; run a repairing scrub operation", severity=alerts.Severity.WARNING, ) ) return alert_list def disk_wearout_check() -> list[alerts.Alert]: check_config = cvars.config.get()["checks"]["wearout"] alert_list = [] for disk in check_config["disks"]: try: wearout_reading = get_wearout_reading(disk["name"]) except Exception as exc: alert_list.append( alerts.Alert( alert_type=alerts.AlertType.ERROR, message=f"Could not check wearout for disk {disk['name']}: {repr(exc)}, see logs", severity=alerts.Severity.CRITICAL, ) ) logging.error(traceback.format_exc()) continue if IS_TESTING or wearout_reading.current_reading < wearout_reading.threshold_reading: match wearout_reading.indicator: case WearoutIndicator.REALLOCATED_SECTORS: message = f"Disk {disk['name']} has reallocated sectors (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})" case WearoutIndicator.SPARE_BLOCKS: message = f"Disk {disk['name']} has too few spare blocks (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})" alert_list.append( alerts.Alert( alert_type=alerts.AlertType.DISKS, message=message, severity=alerts.Severity[disk["severity"]] ) ) return alert_list