import logging import traceback from datetime import timedelta from alerting import alerts from alerting.enum import AlertType, Severity from misc import cvars, docker_registry, sensors, vuln from misc.disks import LVAttr, WearoutIndicator, get_wearout_reading IS_TESTING = False def temp_check() -> list[alerts.Alert]: alert_list = [] temps = sensors.Sensors.get_temperatures() for _, sensor_list in temps.items(): for sensor in sensor_list: if sensor.sensor_type == "nct6687": continue # little valuable info and too low limits there, might as well ignore it if sensor.critical_temp is not None and (IS_TESTING or sensor.current_temp > sensor.critical_temp): alert = alerts.Alert( alert_type=AlertType("TEMP"), message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C", severity=Severity.CRITICAL, ) elif sensor.highest_temp is not None and (IS_TESTING or sensor.current_temp > sensor.highest_temp): alert = alerts.Alert( alert_type=AlertType("TEMP"), message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.highest_temp}°C", severity=Severity.WARNING, ) else: continue alert_list.append(alert) return alert_list def cpu_check() -> list[alerts.Alert]: sensor = sensors.Sensors.get_cpu() if IS_TESTING or sensor.current_load > sensor.critical_load: alert = alerts.Alert( alert_type=AlertType("CPU"), message=f"{sensor.current_load}% > {sensor.critical_load}%", severity=Severity.CRITICAL, ) elif IS_TESTING or sensor.current_load > sensor.highest_load: alert = alerts.Alert( alert_type=AlertType("CPU"), message=f"{sensor.current_load}% > {sensor.highest_load}%", severity=Severity.WARNING, ) else: return [] return [alert] def ram_check() -> list[alerts.Alert]: sensor = sensors.Sensors.get_ram() if IS_TESTING or sensor.current_avail < sensor.critical_avail: alert = alerts.Alert( alert_type=AlertType("RAM"), message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.critical_avail / 1024**3):.2f} GiB", severity=Severity.CRITICAL, ) elif IS_TESTING or sensor.current_avail < sensor.warning_avail: alert = alerts.Alert( alert_type=AlertType("RAM"), message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.warning_avail / 1024**3):.2f} GiB", severity=Severity.WARNING, ) else: return [] return [alert] async def vuln_check() -> list[alerts.Alert]: vulns = await vuln.get_vulns() alert_list = [] for v in vulns: if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL): match v.severity: case vuln.Severity.LOW: severity = Severity.INFO case vuln.Severity.MEDIUM: severity = Severity.WARNING case vuln.Severity.HIGH | vuln.Severity.CRITICAL: severity = Severity.CRITICAL message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" html_message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" if v.fixed: message.append(f" -- update to {v.fixed} ASAP") html_message.append(f" -- update to {v.fixed} ASAP") alert = alerts.Alert( alert_type=AlertType.VULN, message=message, html_message=html_message, severity=severity, ) alert_list.append(alert) return alert_list async def ups_check() -> list[alerts.Alert]: sensor = await sensors.Sensors.get_ups() if not sensor: return alert_list = [] if IS_TESTING or sensor.battery_charge_percentage < sensor.battery_critical_percentage: alert_list.append( alerts.Alert( alert_type=AlertType.UPS, message=f"Battery is under {sensor.battery_critical_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=Severity.CRITICAL, ) ) elif IS_TESTING or sensor.battery_charge_percentage < sensor.battery_warning_percentage: alert_list.append( alerts.Alert( alert_type=AlertType.UPS, message=f"Battery is under {sensor.battery_warning_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=Severity.WARNING, ) ) for status in sensor.ups_status: if IS_TESTING or status == sensors.UPSStatus.UPS_OVERLOAD: alert_list.append( alerts.Alert(alert_type=AlertType.UPS, message=f"UPS is overloaded!", severity=Severity.CRITICAL) ) elif IS_TESTING or status == sensors.UPSStatus.ON_BATTERY: alert_list.append( alerts.Alert( alert_type=AlertType.UPS, message=f"UPS is on battery.\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.", severity=Severity.INFO, ) ) elif IS_TESTING or status == sensors.UPSStatus.UPS_TRIM: alert_list.append( alerts.Alert( alert_type=AlertType.UPS, message=f"Overvoltage detected: trimming voltage to nominal.", severity=Severity.INFO, ) ) elif IS_TESTING or status == sensors.UPSStatus.UPS_BOOST: alert_list.append( alerts.Alert( alert_type=AlertType.UPS, message=f"Undervoltage detected: boosting voltage to nominal.", severity=Severity.INFO, ) ) return alert_list async def docker_registry_check() -> list[alerts.Alert]: updated_images = await docker_registry.get_updated_images() alert_list = [] for image in updated_images: alert_list.append( alerts.Alert( alert_type=AlertType.UPDATE, message=f"{image} docker image: new version available", severity=Severity.INFO, ) ) return alert_list def raid_check() -> list[alerts.Alert]: check_config = cvars.config.get().checks.raid alert_list = [] for lv in check_config.lvs: try: lv_attr = LVAttr.from_cli(lv) except Exception as exc: alert_list.append( alerts.Alert( alert_type=AlertType.ERROR, message=f"Could not check RAID LV {lv}: {repr(exc)}, see logs", severity=Severity.CRITICAL, ) ) logging.error(traceback.format_exc()) continue # sanity check if lv_attr.vol_type not in [LVAttr.VolType.RAID, LVAttr.VolType.RAID_NOSYNC]: alert_list.append( alerts.Alert( alert_type=AlertType.ERROR, message=f"LV {lv} is not of RAID type", severity=Severity.CRITICAL, ) ) continue if IS_TESTING: alert_list.append( alerts.Alert( alert_type=AlertType.RAID, message=f"Test alert: LV {lv} health is {lv_attr.health}", severity=Severity.INFO, ) ) match lv_attr.health: case LVAttr.Health.PARTIAL: alert_list.append( alerts.Alert( alert_type=AlertType.RAID, message=f"LV {lv} operating in partial mode; one of PVs has failed", severity=Severity.CRITICAL, ) ) case LVAttr.Health.UNKNOWN: alert_list.append( alerts.Alert( alert_type=AlertType.RAID, message=f"LV {lv}'s state is unknown", severity=Severity.CRITICAL, ) ) case LVAttr.Health.REFRESH_NEEDED: alert_list.append( alerts.Alert( alert_type=AlertType.RAID, message=f"LV {lv} has suffered a write error; run a refresh or replace the failing PV", severity=Severity.WARNING, ) ) case LVAttr.Health.MISMATCHES: alert_list.append( alerts.Alert( alert_type=AlertType.RAID, message=f"LV {lv} is partially incoherent; run a repairing scrub operation", severity=Severity.WARNING, ) ) return alert_list def disk_wearout_check() -> list[alerts.Alert]: check_config = cvars.config.get().checks.wearout alert_list = [] for disk in check_config.disks: try: wearout_reading = get_wearout_reading(disk.name) except Exception as exc: alert_list.append( alerts.Alert( alert_type=AlertType.ERROR, message=f"Could not check wearout for disk {disk.name}: {repr(exc)}, see logs", severity=Severity.CRITICAL, ) ) logging.error(traceback.format_exc()) continue if IS_TESTING or wearout_reading.current_reading < wearout_reading.threshold_reading: match wearout_reading.indicator: case WearoutIndicator.REALLOCATED_SECTORS: message = f"Disk {disk.name} has reallocated sectors (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})" case WearoutIndicator.SPARE_BLOCKS: message = f"Disk {disk.name} has too few spare blocks (curr {wearout_reading.current_reading}, thresh {wearout_reading.threshold_reading})" alert_list.append( alerts.Alert(alert_type=AlertType.DISKS, message=message, severity=Severity[disk.severity]) ) return alert_list