lego-monitoring/misc/checks.py
2024-08-17 14:25:02 +03:00

195 lines
7.9 KiB
Python

from datetime import timedelta
from alerting import alerts
from misc import sensors, vuln
from misc.enums import UPSStatus
IS_TESTING = False
def temp_check() -> list[alerts.Alert]:
alert_list = []
temps = sensors.Sensors.get_temperatures()
for _, sensor_list in temps.items():
for sensor in sensor_list:
if sensor.sensor_type == "nct6687":
continue # little valuable info and too low limits there, might as well ignore it
if sensor.critical_temp is not None and (IS_TESTING or sensor.current_temp > sensor.critical_temp):
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C",
severity=alerts.Severity.CRITICAL,
)
elif sensor.highest_temp is not None and (IS_TESTING or sensor.current_temp > sensor.highest_temp):
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.highest_temp}°C",
severity=alerts.Severity.WARNING,
)
else:
continue
alert_list.append(alert)
return alert_list
def cpu_check() -> list[alerts.Alert]:
sensor = sensors.Sensors.get_cpu()
if IS_TESTING or sensor.current_load > sensor.critical_load:
alert = alerts.Alert(
alert_type=alerts.AlertType("CPU"),
message=f"{sensor.current_load}% > {sensor.critical_load}%",
severity=alerts.Severity.CRITICAL,
)
elif IS_TESTING or sensor.current_load > sensor.highest_load:
alert = alerts.Alert(
alert_type=alerts.AlertType("CPU"),
message=f"{sensor.current_load}% > {sensor.highest_load}%",
severity=alerts.Severity.WARNING,
)
else:
return []
return [alert]
def ram_check() -> list[alerts.Alert]:
sensor = sensors.Sensors.get_ram()
if IS_TESTING or sensor.current_avail < sensor.critical_avail:
alert = alerts.Alert(
alert_type=alerts.AlertType("RAM"),
message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.critical_avail / 1024**3):.2f} GiB",
severity=alerts.Severity.CRITICAL,
)
elif IS_TESTING or sensor.current_avail < sensor.warning_avail:
alert = alerts.Alert(
alert_type=alerts.AlertType("RAM"),
message=f"{(sensor.current_avail / 1024**3):.2f} GiB < {(sensor.warning_avail / 1024**3):.2f} GiB",
severity=alerts.Severity.WARNING,
)
else:
return []
return [alert]
async def vuln_check() -> list[alerts.Alert]:
vulns = await vuln.get_vulns()
alert_list = []
for v in vulns:
if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL):
match v.severity:
case vuln.Severity.LOW:
severity = alerts.Severity.INFO
case vuln.Severity.MEDIUM:
severity = alerts.Severity.WARNING
case vuln.Severity.HIGH | vuln.Severity.CRITICAL:
severity = alerts.Severity.CRITICAL
message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
html_message = f"<a href='{v.link}'>{v.id}</a>: {v.vuln_type} in {','.join(v.packages)}"
if v.fixed:
message.append(f" -- update to {v.fixed} ASAP")
html_message.append(f" -- update to {v.fixed} ASAP")
alert = alerts.Alert(
alert_type=alerts.AlertType.VULN,
message=message,
html_message=html_message,
severity=severity,
)
alert_list.append(alert)
return alert_list
async def ups_check() -> list[alerts.Alert]:
sensor = await sensors.Sensors.get_ups()
if not sensor:
return
alert_list = []
if IS_TESTING or sensor.battery_charge_percentage < sensor.battery_critical_percentage:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"Battery is under {sensor.battery_critical_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.CRITICAL,
)
)
elif IS_TESTING or sensor.battery_charge_percentage < sensor.battery_warning_percentage:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"Battery is under {sensor.battery_warning_percentage}%\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.WARNING,
)
)
for status in sensor.ups_status:
if IS_TESTING or status == UPSStatus.BATTERY_REPLACE:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"UPS battery needs to be replaced ASAP!",
severity=alerts.Severity.CRITICAL,
)
)
elif IS_TESTING or status == UPSStatus.UPS_OVERLOAD:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS, message=f"UPS is overloaded!", severity=alerts.Severity.CRITICAL
)
)
elif IS_TESTING or status == UPSStatus.UPS_BYPASS:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"BYPASS MODE: Battery and connected devices are not protected from power outage!",
severity=alerts.Severity.WARNING,
)
)
elif IS_TESTING or status == UPSStatus.UPS_CALIBRATION:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"UPS is currently performing runtime calibration.",
severity=alerts.Severity.INFO,
)
)
elif IS_TESTING or status == UPSStatus.ON_BATTERY:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"UPS is on battery.\n{sensor.battery_charge_percentage}% ({timedelta(seconds=sensor.battery_runtime)}) remaining.",
severity=alerts.Severity.INFO,
)
)
elif IS_TESTING or status == UPSStatus.UPS_OFFLINE:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS, message=f"UPS seems to be offline.", severity=alerts.Severity.INFO
)
)
elif IS_TESTING or status == UPSStatus.UPS_TRIM:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"Overvoltage detected: trimming voltage to nominal.",
severity=alerts.Severity.INFO,
)
)
elif IS_TESTING or status == UPSStatus.UPS_BOOST:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"Undervoltage detected: boosting voltage to nominal.",
severity=alerts.Severity.INFO,
)
)
elif IS_TESTING or status == UPSStatus.UPS_FSD:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPS,
message=f"Shutdown imminent!",
severity=alerts.Severity.CRITICAL,
)
)
return alert_list