handle events sent to ups pipe

This commit is contained in:
Alex Tau 2025-12-19 15:48:01 +03:00
parent 58e47ae584
commit 40e30529eb
10 changed files with 343 additions and 134 deletions

View file

@ -12,7 +12,13 @@ from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import BaseChecker, IntervalChecker, ScheduledChecker
from .core.checkers import (
BaseChecker,
IntervalChecker,
PipeIntervalChecker,
ScheduledChecker,
)
from .core.const import UPS_PIPE_NAME
stopping = False
@ -86,7 +92,14 @@ async def async_main():
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
check_sets.UPS: [IntervalChecker(checks.ups_check, interval=datetime.timedelta(minutes=5), persistent=True)],
check_sets.UPS: [
PipeIntervalChecker(
checks.UPSTracker().ups_check,
interval=datetime.timedelta(minutes=5),
persistent=True,
pipe=UPS_PIPE_NAME,
)
],
}
checkers = []
@ -131,7 +144,13 @@ async def async_main():
alert = checks.generate_stop_alert()
await sender.send_alert(alert)
await sender.send_healthchecks_status(alert)
await tg_client.disconnect()
for c in checkers:
try:
await c.graceful_stop()
except AttributeError:
continue
if tg_client:
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)

View file

@ -9,7 +9,7 @@ from ..checks.utils import format_for_healthchecks_slug
from ..core import cvars
from .alert import Alert
from .clients.healthchecks import HealthchecksClient
from .enum import SEVERITY_TO_EMOJI, AlertType, Severity
from .enum import SEVERITY_TO_EMOJI, Severity
async def get_tg_client() -> TelegramClient:
@ -23,9 +23,7 @@ async def get_tg_client() -> TelegramClient:
def get_healthchecks_client() -> HealthchecksClient:
config = cvars.config.get()
base_url = config.alert_channels.healthchecks.pinging_api_endpoint
client = HealthchecksClient(
base_url=config.alert_channels.healthchecks.pinging_api_endpoint, client=AiohttpClient()
)
client = HealthchecksClient(base_url=base_url, client=AiohttpClient())
return client

View file

@ -4,5 +4,5 @@ from .ram import ram_check
from .remind import remind_check
from .self import generate_start_alert, generate_stop_alert, self_check
from .temp import temp_check
from .ups import ups_check
from .ups.check import UPSTracker
from .vulnix import vulnix_check

View file

@ -1,120 +0,0 @@
import subprocess
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum
from socket import gethostname
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
from lego_monitoring.core.const import UPSC_PATH
from .utils import format_for_healthchecks_slug
class UPSStatus(StrEnum):
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
ON_LINE = "OL"
ON_BATTERY = "OB"
BATTERY_LOW = "LB"
BATTERY_HIGH = "HB"
BATTERY_REPLACE = "RB"
BATTERY_CHARGING = "CHRG"
BATTERY_DISCHARGING = "DISCHRG"
UPS_BYPASS = "BYPASS"
UPS_OFFLINE = "OFF"
UPS_OVERLOAD = "OVER"
UPS_CALIBRATION = "CAL"
UPS_TRIM = "TRIM"
UPS_BOOST = "BOOST"
UPS_FSD = "FSD"
ALARM = "ALARM"
WAIT = "WAIT"
@dataclass
class UPSStats:
ups_status: list[UPSStatus] = None
battery_charge_percentage: int = None
battery_warning_percentage: int = 20
battery_critical_percentage: int = 10
battery_runtime: int = 1000
def __str__(self):
return f"""Status: {' '.join(self.ups_status)}
Battery: {self.battery_charge_percentage}%
Remaining runtime: {timedelta(seconds=self.battery_runtime)}
Will warn at {self.battery_warning_percentage}%
Will shut down at {self.battery_critical_percentage}%
"""
def get_ups_list() -> list[str]:
run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8")
return run_results.stdout.splitlines()
def get_ups_stats(ups: str) -> UPSStats:
stats = UPSStats()
run_results = subprocess.run([UPSC_PATH, ups], stdout=subprocess.PIPE, encoding="utf-8")
for line in run_results.stdout.splitlines():
variable, value = line.split(": ")[:2]
match variable:
case "battery.charge":
stats.battery_charge_percentage = int(value)
case "battery.charge.low":
stats.battery_critical_percentage = int(value)
case "battery.charge.warning":
stats.battery_warning_percentage = int(value)
case "battery.runtime":
stats.battery_runtime = int(value)
case "ups.status":
stats.ups_status = [UPSStatus(status) for status in value.split()]
case _:
...
return stats
def ups_check() -> list[Alert]:
config = cvars.config.get().checks.ups
if config.ups_to_check is None:
ups_list = get_ups_list()
else:
ups_list = config.ups_to_check
alerts = []
for ups in ups_list:
stats = get_ups_stats(ups)
slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups)}-periodic"
severity = Severity.OK
reasons_for_severity = []
if stats.battery_charge_percentage < stats.battery_critical_percentage:
severity = Severity.CRITICAL
reasons_for_severity.append("Critical percentage reached")
elif stats.battery_charge_percentage < stats.battery_critical_percentage:
severity = Severity.WARNING
reasons_for_severity.append("Warning percentage reached")
for status in stats.ups_status:
if status == UPSStatus.UPS_OVERLOAD:
severity = Severity.CRITICAL
reasons_for_severity.append("UPS is overloaded")
elif status == UPSStatus.ON_BATTERY:
severity = max(Severity.WARNING, severity)
reasons_for_severity.append("UPS is on battery")
elif status == UPSStatus.ALARM:
severity = max(Severity.WARNING, severity)
reasons_for_severity.append("Alarm triggered")
elif status == UPSStatus.WAIT:
severity = max(Severity.INFO, severity)
reasons_for_severity.append("Waiting for info from UPS driver")
if len(reasons_for_severity) > 0:
message = f"NOTE: {', '.join(reasons_for_severity)}\n{stats}"
else:
message = str(stats)
alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug))
return alerts

View file

@ -0,0 +1,184 @@
import logging
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import StrEnum
from socket import gethostname
from typing import Optional
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.config.checks.ups import UPSCheckConfig
from lego_monitoring.core import cvars
from lego_monitoring.core.const import UPSC_PATH
from ..utils import format_for_healthchecks_slug
from .events import UPSEvent, UPSEventType
class UPSStatus(StrEnum):
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
ON_LINE = "OL"
ON_BATTERY = "OB"
BATTERY_LOW = "LB"
BATTERY_HIGH = "HB"
BATTERY_REPLACE = "RB"
BATTERY_CHARGING = "CHRG"
BATTERY_DISCHARGING = "DISCHRG"
UPS_BYPASS = "BYPASS"
UPS_OFFLINE = "OFF"
UPS_OVERLOAD = "OVER"
UPS_CALIBRATION = "CAL"
UPS_TRIM = "TRIM"
UPS_BOOST = "BOOST"
UPS_FSD = "FSD"
ALARM = "ALARM"
WAIT = "WAIT"
@dataclass
class UPS:
name: str
ups_status: Optional[list[UPSStatus]] = None
latest_events: list[UPSEventType] = field(default_factory=list)
latest_event_time: Optional[datetime] = None
battery_charge_percentage: Optional[int] = None
battery_warning_percentage: Optional[int] = None
battery_critical_percentage: Optional[int] = None
battery_runtime: Optional[int] = None
def __str__(self):
return f"""Name: {self.name}
Latest events: {f"{', '.join(self.latest_events)} @ {self.latest_event_time.isoformat()}" if len(self.latest_events) > 0 else 'no events recorded'}
Status: {' '.join(self.ups_status) if self.ups_status is not None else '?'}
Battery: {self.battery_charge_percentage if self.battery_charge_percentage is not None else '?'}%
Remaining runtime: {timedelta(seconds=self.battery_runtime) if self.battery_runtime is not None else '?'}
Will warn at {self.battery_warning_percentage if self.battery_warning_percentage is not None else '?'}%
Will shut down at {self.battery_critical_percentage if self.battery_critical_percentage is not None else '?'}%
"""
def get_ups_list() -> list[str]:
run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8")
return run_results.stdout.splitlines()
@dataclass
class UPSTracker:
upses: dict[str, UPS] = field(default_factory=dict)
config: UPSCheckConfig = None
def __post_init__(self):
self.config = cvars.config.get().checks.ups
def ups_check(self, ups_events_raw: list[dict]) -> list[Alert]:
ups_events: dict[str, list[UPSEvent]] = {}
for d in ups_events_raw:
event = UPSEvent(**d)
if event.ups_name not in ups_events:
ups_events[event.ups_name] = [event]
else:
ups_events[event.ups_name].append(event)
if self.config.ups_to_check is None:
ups_list = get_ups_list()
else:
ups_list = self.config.ups_to_check
alerts = []
for ups_name in ups_list:
if ups_name not in self.upses:
ups = get_ups_stats(ups_name)
else:
ups = get_ups_stats(self.upses[ups_name])
self.upses[ups_name] = ups
slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups_name)}"
severity = Severity.OK
reasons_for_severity = set()
if ups_name in ups_events:
ups.latest_event_time = datetime.now()
ups.latest_events = []
for event in ups_events[ups_name]:
ups.latest_events.append(event.type_)
match event.type_:
case UPSEventType.FSD:
severity = Severity.CRITICAL
reasons_for_severity.add("Forced shutdown")
case UPSEventType.ALARM:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Alarm triggered")
for event in ups.latest_events:
match event:
case UPSEventType.COMMBAD:
severity = Severity.CRITICAL
reasons_for_severity.add("Communication lost")
case UPSEventType.SHUTDOWN:
severity = Severity.CRITICAL
reasons_for_severity.add("Shutting down now")
case UPSEventType.NOCOMM:
severity = Severity.CRITICAL
reasons_for_severity.add("Cannot establish communication")
if ups.battery_charge_percentage < ups.battery_critical_percentage:
severity = Severity.CRITICAL
reasons_for_severity.add("Critical percentage reached")
elif ups.battery_charge_percentage < ups.battery_critical_percentage:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Warning percentage reached")
for status in ups.ups_status:
match status:
case UPSStatus.UPS_OVERLOAD:
severity = Severity.CRITICAL
reasons_for_severity.add("UPS is overloaded")
case UPSStatus.ON_BATTERY:
severity = max(Severity.WARNING, severity)
reasons_for_severity.add("UPS is on battery")
case UPSStatus.WAIT:
severity = max(Severity.INFO, severity)
reasons_for_severity.add("Waiting for info from UPS driver")
case UPSStatus.UPS_FSD:
severity = Severity.CRITICAL
reasons_for_severity.add("Forced shutdown")
case UPSStatus.ALARM:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Alarm triggered")
if len(reasons_for_severity) > 0:
message = f"NOTE: {', '.join(reasons_for_severity)}\n{ups}"
else:
message = str(ups)
alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug))
return alerts
def get_ups_stats(ups_or_name: str | UPS) -> UPS:
if isinstance(ups_or_name, UPS):
ups = ups_or_name
else:
ups = UPS(name=ups_or_name)
run_results = subprocess.run([UPSC_PATH, ups.name], stdout=subprocess.PIPE, encoding="utf-8")
for line in run_results.stdout.splitlines():
variable, value = line.split(": ")[:2]
match variable:
case "battery.charge":
ups.battery_charge_percentage = int(value)
case "battery.charge.low":
ups.battery_critical_percentage = int(value)
case "battery.charge.warning":
ups.battery_warning_percentage = int(value)
case "battery.runtime":
ups.battery_runtime = int(value)
case "ups.status":
ups.ups_status = [UPSStatus(status) for status in value.split()]
case _:
...
return ups

View file

@ -0,0 +1,44 @@
from dataclasses import dataclass
from enum import StrEnum, auto
class UPSEventType(StrEnum):
"""https://networkupstools.org/docs/man/upsmon.html#_notify_events"""
ONLINE = "ONLINE"
ONBATT = "ONBATT"
LOWBATT = "LOWBATT"
FSD = "FSD"
COMMOK = "COMMOK"
COMMBAD = "COMMBAD"
SHUTDOWN = "SHUTDOWN"
REPLBATT = "REPLBATT"
NOCOMM = "NOCOMM"
NOPARENT = "NOPARENT"
CAL = "CAL"
NOTCAL = "NOTCAL"
OFF = "OFF"
NOTOFF = "NOTOFF"
BYPASS = "BYPASS"
NOTBYPASS = "NOTBYPASS"
ECO = "ECO"
NOTECO = "NOTECO"
ALARM = "ALARM"
NOTALARM = "NOTALARM"
OVER = "OVER"
NOTOVER = "NOTOVER"
TRIM = "TRIM"
NOTTRIM = "NOTTRIM"
BOOST = "BOOST"
NOTBOOST = "NOTBOOST"
OTHER = "OTHER"
NOTOTHER = "NOTOTHER"
SUSPEND_STARTING = "SUSPEND_STARTING"
SUSPEND_FINISHED = "SUSPEND_FINISHED"
@dataclass
class UPSEvent:
type_: UPSEventType
message: str
ups_name: str

View file

@ -1,8 +1,12 @@
import asyncio
import datetime
import json
import logging
import os
from dataclasses import KW_ONLY, dataclass, field
from typing import Any, Callable, Coroutine
from typing import Any, Callable, Coroutine, Optional
import aiofiles
from ..alerting.alert import Alert
from ..alerting.current import CurrentAlerts
@ -10,7 +14,7 @@ from ..alerting.enum import Severity
from ..alerting.sender import send_alert, send_healthchecks_status
@dataclass
@dataclass(repr=False)
class BaseChecker:
check: Callable | Coroutine
@ -51,9 +55,12 @@ class BaseChecker:
check_kwargs: dict[str, Any] = field(default_factory=dict)
current_alerts: CurrentAlerts = field(default_factory=CurrentAlerts, init=False)
async def _call_check(self) -> list[Alert]:
def __repr__(self):
return f"<{type(self).__name__}(check={self.check})>"
async def _call_check(self, *extra_args, **extra_kwargs) -> list[Alert]:
if isinstance(self.check, Callable):
result = self.check(*self.check_args, **self.check_kwargs)
result = self.check(*self.check_args, *extra_args, **self.check_kwargs, **extra_kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(self.check, Coroutine):
@ -83,8 +90,10 @@ class BaseChecker:
raise NotImplementedError
@dataclass
@dataclass(repr=False)
class IntervalChecker(BaseChecker):
"Checker that calls the check each interval"
_: KW_ONLY
interval: datetime.timedelta
ignore_first_run: bool = False
@ -103,8 +112,10 @@ class IntervalChecker(BaseChecker):
await asyncio.sleep(interval_secs)
@dataclass
@dataclass(repr=False)
class ScheduledChecker(BaseChecker):
"Checker that calls the check each period (usually a day) at the specified time"
_: KW_ONLY
period: datetime.timedelta
when: datetime.time
@ -128,3 +139,63 @@ class ScheduledChecker(BaseChecker):
await self._handle_alerts(result)
case _:
raise NotImplementedError
@dataclass(repr=False)
class PipeIntervalChecker(IntervalChecker):
"""
Checker that watches the specified pipe and calls the check if something arrives.
The check is guaranteed to be called at least once per interval, with empty argument list if nothing arrives
"""
_: KW_ONLY
pipe: str
read_task: Optional[asyncio.Task] = None
async def _read_status(self) -> list:
async with aiofiles.open(self.pipe, "r") as p:
return [json.loads(line.rstrip()) async for line in p]
# await asyncio.sleep(60)
# return []
async def run_checker(self) -> None:
interval_secs = self.interval.total_seconds()
ignore_first_run = self.ignore_first_run
try:
os.remove(self.pipe)
except FileNotFoundError:
pass
os.mkfifo(self.pipe)
while True:
logging.info(f"Waiting on pipe {self.pipe}")
self.read_task = asyncio.create_task(self._read_status())
try:
status = await asyncio.wait_for(self.read_task, interval_secs)
logging.info(f"Got {len(status)} arguments from pipe {self.pipe}")
except asyncio.TimeoutError:
status = []
logging.info(f"No arguments from {self.pipe}, timeout exceeded")
self.read_task = None
logging.info(f"Calling {self.check.__name__}")
result = await self._call_check(status)
logging.info(f"Got {len(result)} alerts")
if ignore_first_run:
ignore_first_run = False
else:
await self._handle_alerts(result)
async def graceful_stop(self) -> None:
logging.info("Cancelling pipe read task")
if self.read_task:
self.read_task.cancel()
async with aiofiles.open(self.pipe, "w") as p:
await p.write("")
try:
await self.read_task
except asyncio.CancelledError:
pass
logging.info("Removing pipe")
os.remove(self.pipe)
logging.info("Done!")

View file

@ -1,2 +1,3 @@
VULNIX_PATH: str = ... # path to vulnix executable
UPSC_PATH = "/usr/bin/upsc"
UPS_PIPE_NAME = "/tmp/lego-monitoring-ups-status"