import logging import subprocess from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import StrEnum from socket import gethostname from typing import Optional from lego_monitoring.alerting.alert import Alert from lego_monitoring.alerting.enum import AlertType, Severity from lego_monitoring.config.checks.ups import UPSCheckConfig from lego_monitoring.core import cvars from lego_monitoring.core.const import UPSC_PATH from ..utils import format_for_healthchecks_slug from .events import UPSEvent, UPSEventType class UPSStatus(StrEnum): """https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data""" ON_LINE = "OL" ON_BATTERY = "OB" BATTERY_LOW = "LB" BATTERY_HIGH = "HB" BATTERY_REPLACE = "RB" BATTERY_CHARGING = "CHRG" BATTERY_DISCHARGING = "DISCHRG" UPS_BYPASS = "BYPASS" UPS_OFFLINE = "OFF" UPS_OVERLOAD = "OVER" UPS_CALIBRATION = "CAL" UPS_TRIM = "TRIM" UPS_BOOST = "BOOST" UPS_FSD = "FSD" ALARM = "ALARM" WAIT = "WAIT" @dataclass class UPS: name: str ups_status: Optional[list[UPSStatus]] = None latest_events: list[UPSEventType] = field(default_factory=list) latest_event_time: Optional[datetime] = None battery_charge_percentage: Optional[int] = None battery_warning_percentage: Optional[int] = None battery_critical_percentage: Optional[int] = None battery_runtime: Optional[int] = None def __str__(self): return f"""Name: {self.name} Latest events: {f"{', '.join(self.latest_events)} @ {self.latest_event_time.isoformat()}" if len(self.latest_events) > 0 else 'no events recorded'} Status: {' '.join(self.ups_status) if self.ups_status is not None else '?'} Battery: {self.battery_charge_percentage if self.battery_charge_percentage is not None else '?'}% Remaining runtime: {timedelta(seconds=self.battery_runtime) if self.battery_runtime is not None else '?'} Will warn at {self.battery_warning_percentage if self.battery_warning_percentage is not None else '?'}% Will shut down at {self.battery_critical_percentage if self.battery_critical_percentage is not None else '?'}% """ def get_ups_list() -> list[str]: run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8") return run_results.stdout.splitlines() @dataclass class UPSTracker: upses: dict[str, UPS] = field(default_factory=dict) config: UPSCheckConfig = None def __post_init__(self): self.config = cvars.config.get().checks.ups def ups_check(self, ups_events_raw: list[dict]) -> list[Alert]: ups_events: dict[str, list[UPSEvent]] = {} for d in ups_events_raw: event = UPSEvent(**d) if event.ups_name not in ups_events: ups_events[event.ups_name] = [event] else: ups_events[event.ups_name].append(event) if self.config.ups_to_check is None: ups_list = get_ups_list() else: ups_list = self.config.ups_to_check alerts = [] for ups_name in ups_list: if ups_name not in self.upses: ups = get_ups_stats(ups_name) else: ups = get_ups_stats(self.upses[ups_name]) self.upses[ups_name] = ups slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups_name)}" severity = Severity.OK reasons_for_severity = set() if ups_name in ups_events: ups.latest_event_time = datetime.now() ups.latest_events = [] for event in ups_events[ups_name]: ups.latest_events.append(event.type_) match event.type_: case UPSEventType.FSD: severity = Severity.CRITICAL reasons_for_severity.add("Forced shutdown") case UPSEventType.ALARM: severity = max(severity, Severity.WARNING) reasons_for_severity.add("Alarm triggered") for event in ups.latest_events: match event: case UPSEventType.COMMBAD: severity = Severity.CRITICAL reasons_for_severity.add("Communication lost") case UPSEventType.SHUTDOWN: severity = Severity.CRITICAL reasons_for_severity.add("Shutting down now") case UPSEventType.SHUTDOWN_HOSTSYNC: severity = Severity.CRITICAL reasons_for_severity.add("Shutdown initiated (waiting for secondaries)") case UPSEventType.NOCOMM: severity = Severity.CRITICAL reasons_for_severity.add("Cannot establish communication") if ups.battery_charge_percentage < ups.battery_critical_percentage: severity = Severity.CRITICAL reasons_for_severity.add("Critical percentage reached") elif ups.battery_charge_percentage < ups.battery_critical_percentage: severity = max(severity, Severity.WARNING) reasons_for_severity.add("Warning percentage reached") for status in ups.ups_status: match status: case UPSStatus.UPS_OVERLOAD: severity = Severity.CRITICAL reasons_for_severity.add("UPS is overloaded") case UPSStatus.ON_BATTERY: severity = max(Severity.WARNING, severity) reasons_for_severity.add("UPS is on battery") case UPSStatus.WAIT: severity = max(Severity.INFO, severity) reasons_for_severity.add("Waiting for info from UPS driver") case UPSStatus.UPS_FSD: severity = Severity.CRITICAL reasons_for_severity.add("Forced shutdown") case UPSStatus.ALARM: severity = max(severity, Severity.WARNING) reasons_for_severity.add("Alarm triggered") if len(reasons_for_severity) > 0: message = f"NOTE: {', '.join(reasons_for_severity)}\n{ups}" else: message = str(ups) alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug)) return alerts def get_ups_stats(ups_or_name: str | UPS) -> UPS: if isinstance(ups_or_name, UPS): ups = ups_or_name else: ups = UPS(name=ups_or_name) run_results = subprocess.run([UPSC_PATH, ups.name], stdout=subprocess.PIPE, encoding="utf-8") for line in run_results.stdout.splitlines(): variable, value = line.split(": ")[:2] match variable: case "battery.charge": ups.battery_charge_percentage = int(value) case "battery.charge.low": ups.battery_critical_percentage = int(value) case "battery.charge.warning": ups.battery_warning_percentage = int(value) case "battery.runtime": ups.battery_runtime = int(value) case "ups.status": ups.ups_status = [UPSStatus(status) for status in value.split()] case _: ... return ups