add ups periodic checks

This commit is contained in:
Alex Tau 2025-09-13 14:40:02 +03:00
parent 09eabcc6b2
commit da480a7c4e
13 changed files with 533 additions and 403 deletions

View file

@ -86,6 +86,7 @@ async def async_main():
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
check_sets.UPS: [IntervalChecker(checks.ups_check, interval=datetime.timedelta(minutes=5), persistent=True)],
}
checkers = []

View file

@ -11,13 +11,13 @@ class AlertType(StrEnum):
NET = "NET"
RAM = "RAM"
TEMP = "TEMP"
UPS = "UPS"
VULN = "VULN"
# LOGIN = "LOGIN"
# SMART = "SMART" # TODO
# RAID = "RAID"
# DISKS = "DISKS"
# UPS = "UPS"
# UPDATE = "UPDATE"

View file

@ -4,4 +4,5 @@ from .ram import ram_check
from .remind import remind_check
from .self import generate_start_alert, generate_stop_alert, self_check
from .temp import temp_check
from .ups import ups_check
from .vulnix import vulnix_check

View file

@ -0,0 +1,120 @@
import subprocess
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum
from socket import gethostname
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
from lego_monitoring.core.const import UPSC_PATH
from .utils import format_for_healthchecks_slug
class UPSStatus(StrEnum):
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
ON_LINE = "OL"
ON_BATTERY = "OB"
BATTERY_LOW = "LB"
BATTERY_HIGH = "HB"
BATTERY_REPLACE = "RB"
BATTERY_CHARGING = "CHRG"
BATTERY_DISCHARGING = "DISCHRG"
UPS_BYPASS = "BYPASS"
UPS_OFFLINE = "OFF"
UPS_OVERLOAD = "OVER"
UPS_CALIBRATION = "CAL"
UPS_TRIM = "TRIM"
UPS_BOOST = "BOOST"
UPS_FSD = "FSD"
ALARM = "ALARM"
WAIT = "WAIT"
@dataclass
class UPSStats:
ups_status: list[UPSStatus] = None
battery_charge_percentage: int = None
battery_warning_percentage: int = 20
battery_critical_percentage: int = 10
battery_runtime: int = 1000
def __str__(self):
return f"""Status: {' '.join(self.ups_status)}
Battery: {self.battery_charge_percentage}%
Remaining runtime: {timedelta(seconds=self.battery_runtime)}
Will warn at {self.battery_warning_percentage}%
Will shut down at {self.battery_critical_percentage}%
"""
def get_ups_list() -> list[str]:
run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8")
return run_results.stdout.splitlines()
def get_ups_stats(ups: str) -> UPSStats:
stats = UPSStats()
run_results = subprocess.run([UPSC_PATH, ups], stdout=subprocess.PIPE, encoding="utf-8")
for line in run_results.stdout.splitlines():
variable, value = line.split(": ")[:2]
match variable:
case "battery.charge":
stats.battery_charge_percentage = int(value)
case "battery.charge.low":
stats.battery_critical_percentage = int(value)
case "battery.charge.warning":
stats.battery_warning_percentage = int(value)
case "battery.runtime":
stats.battery_runtime = int(value)
case "ups.status":
stats.ups_status = [UPSStatus(status) for status in value.split()]
case _:
...
return stats
def ups_check() -> list[Alert]:
config = cvars.config.get().checks.ups
if config.ups_to_check is None:
ups_list = get_ups_list()
else:
ups_list = config.ups_to_check
alerts = []
for ups in ups_list:
stats = get_ups_stats(ups)
slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups)}-periodic"
severity = Severity.OK
reasons_for_severity = []
if stats.battery_charge_percentage < stats.battery_critical_percentage:
severity = Severity.CRITICAL
reasons_for_severity.append("Critical percentage reached")
elif stats.battery_charge_percentage < stats.battery_critical_percentage:
severity = Severity.WARNING
reasons_for_severity.append("Warning percentage reached")
for status in stats.ups_status:
if status == UPSStatus.UPS_OVERLOAD:
severity = Severity.CRITICAL
reasons_for_severity.append("UPS is overloaded")
elif status == UPSStatus.ON_BATTERY:
severity = max(Severity.WARNING, severity)
reasons_for_severity.append("UPS is on battery")
elif status == UPSStatus.ALARM:
severity = max(Severity.WARNING, severity)
reasons_for_severity.append("Alarm triggered")
elif status == UPSStatus.WAIT:
severity = max(Severity.INFO, severity)
reasons_for_severity.append("Waiting for info from UPS driver")
if len(reasons_for_severity) > 0:
message = f"NOTE: {', '.join(reasons_for_severity)}\n{stats}"
else:
message = str(stats)
alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug))
return alerts

View file

@ -10,6 +10,7 @@ from .checks.cpu import CpuCheckConfig
from .checks.net import NetCheckConfig
from .checks.ram import RamCheckConfig
from .checks.temp import TempCheckConfig
from .checks.ups import UPSCheckConfig
from .checks.vulnix import VulnixCheckConfig
@ -20,6 +21,7 @@ class ChecksConfig(NestedDeserializableDataclass):
temp: TempCheckConfig = field(default_factory=TempCheckConfig)
vulnix: Optional[VulnixCheckConfig] = None # vulnix check WILL raise if this config section is None
net: NetCheckConfig = field(default_factory=NetCheckConfig)
ups: UPSCheckConfig = field(default_factory=UPSCheckConfig)
@dataclass

View file

@ -0,0 +1,9 @@
from dataclasses import dataclass
from typing import Optional
from alt_utils import NestedDeserializableDataclass
@dataclass
class UPSCheckConfig:
ups_to_check: Optional[list] = None

View file

@ -9,6 +9,7 @@ class CheckSet(StrEnum):
RAM = "ram"
TEMP = "temp"
NET = "net"
UPS = "ups"
VULNIX = "vulnix"

View file

@ -1 +1,2 @@
VULNIX_PATH: str = ... # path to vulnix executable
UPSC_PATH = "/usr/bin/upsc"