From 40e30529eb0ff0d082f9b33a1649fb9bd20799fd Mon Sep 17 00:00:00 2001 From: Alex Tau Date: Fri, 19 Dec 2025 15:48:01 +0300 Subject: [PATCH] handle events sent to ups pipe --- pyproject.toml | 1 + src/lego_monitoring/__init__.py | 25 ++- src/lego_monitoring/alerting/sender.py | 6 +- src/lego_monitoring/checks/__init__.py | 2 +- src/lego_monitoring/checks/ups.py | 120 --------------- src/lego_monitoring/checks/ups/check.py | 184 +++++++++++++++++++++++ src/lego_monitoring/checks/ups/events.py | 44 ++++++ src/lego_monitoring/core/checkers.py | 83 +++++++++- src/lego_monitoring/core/const.py | 1 + uv.lock | 11 ++ 10 files changed, 343 insertions(+), 134 deletions(-) delete mode 100644 src/lego_monitoring/checks/ups.py create mode 100644 src/lego_monitoring/checks/ups/check.py create mode 100644 src/lego_monitoring/checks/ups/events.py diff --git a/pyproject.toml b/pyproject.toml index d9c6434..aefda49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "aiodns>=3.5.0", + "aiofiles>=25.1.0", "aiohttp>=3.12.15", "alt-utils>=0.0.8", "humanize>=4.12.3", diff --git a/src/lego_monitoring/__init__.py b/src/lego_monitoring/__init__.py index 63536dd..29e5e56 100644 --- a/src/lego_monitoring/__init__.py +++ b/src/lego_monitoring/__init__.py @@ -12,7 +12,13 @@ from .checks.temp.sensors import print_readings from .config import enums as config_enums from .config import load_config from .core import cvars -from .core.checkers import BaseChecker, IntervalChecker, ScheduledChecker +from .core.checkers import ( + BaseChecker, + IntervalChecker, + PipeIntervalChecker, + ScheduledChecker, +) +from .core.const import UPS_PIPE_NAME stopping = False @@ -86,7 +92,14 @@ async def async_main(): check_sets.NET: [ IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True) ], - check_sets.UPS: [IntervalChecker(checks.ups_check, interval=datetime.timedelta(minutes=5), persistent=True)], + check_sets.UPS: [ + PipeIntervalChecker( + checks.UPSTracker().ups_check, + interval=datetime.timedelta(minutes=5), + persistent=True, + pipe=UPS_PIPE_NAME, + ) + ], } checkers = [] @@ -131,7 +144,13 @@ async def async_main(): alert = checks.generate_stop_alert() await sender.send_alert(alert) await sender.send_healthchecks_status(alert) - await tg_client.disconnect() + for c in checkers: + try: + await c.graceful_stop() + except AttributeError: + continue + if tg_client: + await tg_client.disconnect() raise SystemExit else: await asyncio.sleep(3) diff --git a/src/lego_monitoring/alerting/sender.py b/src/lego_monitoring/alerting/sender.py index 8601172..e06907f 100644 --- a/src/lego_monitoring/alerting/sender.py +++ b/src/lego_monitoring/alerting/sender.py @@ -9,7 +9,7 @@ from ..checks.utils import format_for_healthchecks_slug from ..core import cvars from .alert import Alert from .clients.healthchecks import HealthchecksClient -from .enum import SEVERITY_TO_EMOJI, AlertType, Severity +from .enum import SEVERITY_TO_EMOJI, Severity async def get_tg_client() -> TelegramClient: @@ -23,9 +23,7 @@ async def get_tg_client() -> TelegramClient: def get_healthchecks_client() -> HealthchecksClient: config = cvars.config.get() base_url = config.alert_channels.healthchecks.pinging_api_endpoint - client = HealthchecksClient( - base_url=config.alert_channels.healthchecks.pinging_api_endpoint, client=AiohttpClient() - ) + client = HealthchecksClient(base_url=base_url, client=AiohttpClient()) return client diff --git a/src/lego_monitoring/checks/__init__.py b/src/lego_monitoring/checks/__init__.py index c5518ee..5be0d7f 100644 --- a/src/lego_monitoring/checks/__init__.py +++ b/src/lego_monitoring/checks/__init__.py @@ -4,5 +4,5 @@ from .ram import ram_check from .remind import remind_check from .self import generate_start_alert, generate_stop_alert, self_check from .temp import temp_check -from .ups import ups_check +from .ups.check import UPSTracker from .vulnix import vulnix_check diff --git a/src/lego_monitoring/checks/ups.py b/src/lego_monitoring/checks/ups.py deleted file mode 100644 index 447bbc8..0000000 --- a/src/lego_monitoring/checks/ups.py +++ /dev/null @@ -1,120 +0,0 @@ -import subprocess -from dataclasses import dataclass -from datetime import timedelta -from enum import StrEnum -from socket import gethostname - -from lego_monitoring.alerting.alert import Alert -from lego_monitoring.alerting.enum import AlertType, Severity -from lego_monitoring.core import cvars -from lego_monitoring.core.const import UPSC_PATH - -from .utils import format_for_healthchecks_slug - - -class UPSStatus(StrEnum): - """https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data""" - - ON_LINE = "OL" - ON_BATTERY = "OB" - BATTERY_LOW = "LB" - BATTERY_HIGH = "HB" - BATTERY_REPLACE = "RB" - BATTERY_CHARGING = "CHRG" - BATTERY_DISCHARGING = "DISCHRG" - UPS_BYPASS = "BYPASS" - UPS_OFFLINE = "OFF" - UPS_OVERLOAD = "OVER" - UPS_CALIBRATION = "CAL" - UPS_TRIM = "TRIM" - UPS_BOOST = "BOOST" - UPS_FSD = "FSD" - ALARM = "ALARM" - WAIT = "WAIT" - - -@dataclass -class UPSStats: - ups_status: list[UPSStatus] = None - battery_charge_percentage: int = None - battery_warning_percentage: int = 20 - battery_critical_percentage: int = 10 - battery_runtime: int = 1000 - - def __str__(self): - return f"""Status: {' '.join(self.ups_status)} -Battery: {self.battery_charge_percentage}% -Remaining runtime: {timedelta(seconds=self.battery_runtime)} -Will warn at {self.battery_warning_percentage}% -Will shut down at {self.battery_critical_percentage}% -""" - - -def get_ups_list() -> list[str]: - run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8") - return run_results.stdout.splitlines() - - -def get_ups_stats(ups: str) -> UPSStats: - stats = UPSStats() - - run_results = subprocess.run([UPSC_PATH, ups], stdout=subprocess.PIPE, encoding="utf-8") - for line in run_results.stdout.splitlines(): - variable, value = line.split(": ")[:2] - match variable: - case "battery.charge": - stats.battery_charge_percentage = int(value) - case "battery.charge.low": - stats.battery_critical_percentage = int(value) - case "battery.charge.warning": - stats.battery_warning_percentage = int(value) - case "battery.runtime": - stats.battery_runtime = int(value) - case "ups.status": - stats.ups_status = [UPSStatus(status) for status in value.split()] - case _: - ... - return stats - - -def ups_check() -> list[Alert]: - config = cvars.config.get().checks.ups - if config.ups_to_check is None: - ups_list = get_ups_list() - else: - ups_list = config.ups_to_check - alerts = [] - for ups in ups_list: - stats = get_ups_stats(ups) - slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups)}-periodic" - severity = Severity.OK - reasons_for_severity = [] - - if stats.battery_charge_percentage < stats.battery_critical_percentage: - severity = Severity.CRITICAL - reasons_for_severity.append("Critical percentage reached") - elif stats.battery_charge_percentage < stats.battery_critical_percentage: - severity = Severity.WARNING - reasons_for_severity.append("Warning percentage reached") - - for status in stats.ups_status: - if status == UPSStatus.UPS_OVERLOAD: - severity = Severity.CRITICAL - reasons_for_severity.append("UPS is overloaded") - elif status == UPSStatus.ON_BATTERY: - severity = max(Severity.WARNING, severity) - reasons_for_severity.append("UPS is on battery") - elif status == UPSStatus.ALARM: - severity = max(Severity.WARNING, severity) - reasons_for_severity.append("Alarm triggered") - elif status == UPSStatus.WAIT: - severity = max(Severity.INFO, severity) - reasons_for_severity.append("Waiting for info from UPS driver") - - if len(reasons_for_severity) > 0: - message = f"NOTE: {', '.join(reasons_for_severity)}\n{stats}" - else: - message = str(stats) - alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug)) - - return alerts diff --git a/src/lego_monitoring/checks/ups/check.py b/src/lego_monitoring/checks/ups/check.py new file mode 100644 index 0000000..deb78ee --- /dev/null +++ b/src/lego_monitoring/checks/ups/check.py @@ -0,0 +1,184 @@ +import logging +import subprocess +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from enum import StrEnum +from socket import gethostname +from typing import Optional + +from lego_monitoring.alerting.alert import Alert +from lego_monitoring.alerting.enum import AlertType, Severity +from lego_monitoring.config.checks.ups import UPSCheckConfig +from lego_monitoring.core import cvars +from lego_monitoring.core.const import UPSC_PATH + +from ..utils import format_for_healthchecks_slug +from .events import UPSEvent, UPSEventType + + +class UPSStatus(StrEnum): + """https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data""" + + ON_LINE = "OL" + ON_BATTERY = "OB" + BATTERY_LOW = "LB" + BATTERY_HIGH = "HB" + BATTERY_REPLACE = "RB" + BATTERY_CHARGING = "CHRG" + BATTERY_DISCHARGING = "DISCHRG" + UPS_BYPASS = "BYPASS" + UPS_OFFLINE = "OFF" + UPS_OVERLOAD = "OVER" + UPS_CALIBRATION = "CAL" + UPS_TRIM = "TRIM" + UPS_BOOST = "BOOST" + UPS_FSD = "FSD" + ALARM = "ALARM" + WAIT = "WAIT" + + +@dataclass +class UPS: + name: str + ups_status: Optional[list[UPSStatus]] = None + latest_events: list[UPSEventType] = field(default_factory=list) + latest_event_time: Optional[datetime] = None + battery_charge_percentage: Optional[int] = None + battery_warning_percentage: Optional[int] = None + battery_critical_percentage: Optional[int] = None + battery_runtime: Optional[int] = None + + def __str__(self): + return f"""Name: {self.name} +Latest events: {f"{', '.join(self.latest_events)} @ {self.latest_event_time.isoformat()}" if len(self.latest_events) > 0 else 'no events recorded'} +Status: {' '.join(self.ups_status) if self.ups_status is not None else '?'} +Battery: {self.battery_charge_percentage if self.battery_charge_percentage is not None else '?'}% +Remaining runtime: {timedelta(seconds=self.battery_runtime) if self.battery_runtime is not None else '?'} +Will warn at {self.battery_warning_percentage if self.battery_warning_percentage is not None else '?'}% +Will shut down at {self.battery_critical_percentage if self.battery_critical_percentage is not None else '?'}% +""" + + +def get_ups_list() -> list[str]: + run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8") + return run_results.stdout.splitlines() + + +@dataclass +class UPSTracker: + upses: dict[str, UPS] = field(default_factory=dict) + config: UPSCheckConfig = None + + def __post_init__(self): + self.config = cvars.config.get().checks.ups + + def ups_check(self, ups_events_raw: list[dict]) -> list[Alert]: + ups_events: dict[str, list[UPSEvent]] = {} + for d in ups_events_raw: + event = UPSEvent(**d) + if event.ups_name not in ups_events: + ups_events[event.ups_name] = [event] + else: + ups_events[event.ups_name].append(event) + + if self.config.ups_to_check is None: + ups_list = get_ups_list() + else: + ups_list = self.config.ups_to_check + + alerts = [] + for ups_name in ups_list: + if ups_name not in self.upses: + ups = get_ups_stats(ups_name) + else: + ups = get_ups_stats(self.upses[ups_name]) + + self.upses[ups_name] = ups + + slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups_name)}" + severity = Severity.OK + reasons_for_severity = set() + + if ups_name in ups_events: + ups.latest_event_time = datetime.now() + ups.latest_events = [] + for event in ups_events[ups_name]: + ups.latest_events.append(event.type_) + + match event.type_: + case UPSEventType.FSD: + severity = Severity.CRITICAL + reasons_for_severity.add("Forced shutdown") + case UPSEventType.ALARM: + severity = max(severity, Severity.WARNING) + reasons_for_severity.add("Alarm triggered") + + for event in ups.latest_events: + match event: + case UPSEventType.COMMBAD: + severity = Severity.CRITICAL + reasons_for_severity.add("Communication lost") + case UPSEventType.SHUTDOWN: + severity = Severity.CRITICAL + reasons_for_severity.add("Shutting down now") + case UPSEventType.NOCOMM: + severity = Severity.CRITICAL + reasons_for_severity.add("Cannot establish communication") + + if ups.battery_charge_percentage < ups.battery_critical_percentage: + severity = Severity.CRITICAL + reasons_for_severity.add("Critical percentage reached") + elif ups.battery_charge_percentage < ups.battery_critical_percentage: + severity = max(severity, Severity.WARNING) + reasons_for_severity.add("Warning percentage reached") + + for status in ups.ups_status: + match status: + case UPSStatus.UPS_OVERLOAD: + severity = Severity.CRITICAL + reasons_for_severity.add("UPS is overloaded") + case UPSStatus.ON_BATTERY: + severity = max(Severity.WARNING, severity) + reasons_for_severity.add("UPS is on battery") + case UPSStatus.WAIT: + severity = max(Severity.INFO, severity) + reasons_for_severity.add("Waiting for info from UPS driver") + case UPSStatus.UPS_FSD: + severity = Severity.CRITICAL + reasons_for_severity.add("Forced shutdown") + case UPSStatus.ALARM: + severity = max(severity, Severity.WARNING) + reasons_for_severity.add("Alarm triggered") + + if len(reasons_for_severity) > 0: + message = f"NOTE: {', '.join(reasons_for_severity)}\n{ups}" + else: + message = str(ups) + alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug)) + + return alerts + + +def get_ups_stats(ups_or_name: str | UPS) -> UPS: + if isinstance(ups_or_name, UPS): + ups = ups_or_name + else: + ups = UPS(name=ups_or_name) + + run_results = subprocess.run([UPSC_PATH, ups.name], stdout=subprocess.PIPE, encoding="utf-8") + for line in run_results.stdout.splitlines(): + variable, value = line.split(": ")[:2] + match variable: + case "battery.charge": + ups.battery_charge_percentage = int(value) + case "battery.charge.low": + ups.battery_critical_percentage = int(value) + case "battery.charge.warning": + ups.battery_warning_percentage = int(value) + case "battery.runtime": + ups.battery_runtime = int(value) + case "ups.status": + ups.ups_status = [UPSStatus(status) for status in value.split()] + case _: + ... + return ups diff --git a/src/lego_monitoring/checks/ups/events.py b/src/lego_monitoring/checks/ups/events.py new file mode 100644 index 0000000..de3f132 --- /dev/null +++ b/src/lego_monitoring/checks/ups/events.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass +from enum import StrEnum, auto + + +class UPSEventType(StrEnum): + """https://networkupstools.org/docs/man/upsmon.html#_notify_events""" + + ONLINE = "ONLINE" + ONBATT = "ONBATT" + LOWBATT = "LOWBATT" + FSD = "FSD" + COMMOK = "COMMOK" + COMMBAD = "COMMBAD" + SHUTDOWN = "SHUTDOWN" + REPLBATT = "REPLBATT" + NOCOMM = "NOCOMM" + NOPARENT = "NOPARENT" + CAL = "CAL" + NOTCAL = "NOTCAL" + OFF = "OFF" + NOTOFF = "NOTOFF" + BYPASS = "BYPASS" + NOTBYPASS = "NOTBYPASS" + ECO = "ECO" + NOTECO = "NOTECO" + ALARM = "ALARM" + NOTALARM = "NOTALARM" + OVER = "OVER" + NOTOVER = "NOTOVER" + TRIM = "TRIM" + NOTTRIM = "NOTTRIM" + BOOST = "BOOST" + NOTBOOST = "NOTBOOST" + OTHER = "OTHER" + NOTOTHER = "NOTOTHER" + SUSPEND_STARTING = "SUSPEND_STARTING" + SUSPEND_FINISHED = "SUSPEND_FINISHED" + + +@dataclass +class UPSEvent: + type_: UPSEventType + message: str + ups_name: str diff --git a/src/lego_monitoring/core/checkers.py b/src/lego_monitoring/core/checkers.py index d3e4a0f..29e041c 100644 --- a/src/lego_monitoring/core/checkers.py +++ b/src/lego_monitoring/core/checkers.py @@ -1,8 +1,12 @@ import asyncio import datetime +import json import logging +import os from dataclasses import KW_ONLY, dataclass, field -from typing import Any, Callable, Coroutine +from typing import Any, Callable, Coroutine, Optional + +import aiofiles from ..alerting.alert import Alert from ..alerting.current import CurrentAlerts @@ -10,7 +14,7 @@ from ..alerting.enum import Severity from ..alerting.sender import send_alert, send_healthchecks_status -@dataclass +@dataclass(repr=False) class BaseChecker: check: Callable | Coroutine @@ -51,9 +55,12 @@ class BaseChecker: check_kwargs: dict[str, Any] = field(default_factory=dict) current_alerts: CurrentAlerts = field(default_factory=CurrentAlerts, init=False) - async def _call_check(self) -> list[Alert]: + def __repr__(self): + return f"<{type(self).__name__}(check={self.check})>" + + async def _call_check(self, *extra_args, **extra_kwargs) -> list[Alert]: if isinstance(self.check, Callable): - result = self.check(*self.check_args, **self.check_kwargs) + result = self.check(*self.check_args, *extra_args, **self.check_kwargs, **extra_kwargs) if isinstance(result, Coroutine): result = await result elif isinstance(self.check, Coroutine): @@ -83,8 +90,10 @@ class BaseChecker: raise NotImplementedError -@dataclass +@dataclass(repr=False) class IntervalChecker(BaseChecker): + "Checker that calls the check each interval" + _: KW_ONLY interval: datetime.timedelta ignore_first_run: bool = False @@ -103,8 +112,10 @@ class IntervalChecker(BaseChecker): await asyncio.sleep(interval_secs) -@dataclass +@dataclass(repr=False) class ScheduledChecker(BaseChecker): + "Checker that calls the check each period (usually a day) at the specified time" + _: KW_ONLY period: datetime.timedelta when: datetime.time @@ -128,3 +139,63 @@ class ScheduledChecker(BaseChecker): await self._handle_alerts(result) case _: raise NotImplementedError + + +@dataclass(repr=False) +class PipeIntervalChecker(IntervalChecker): + """ + Checker that watches the specified pipe and calls the check if something arrives. + The check is guaranteed to be called at least once per interval, with empty argument list if nothing arrives + """ + + _: KW_ONLY + pipe: str + read_task: Optional[asyncio.Task] = None + + async def _read_status(self) -> list: + async with aiofiles.open(self.pipe, "r") as p: + return [json.loads(line.rstrip()) async for line in p] + # await asyncio.sleep(60) + # return [] + + async def run_checker(self) -> None: + interval_secs = self.interval.total_seconds() + ignore_first_run = self.ignore_first_run + try: + os.remove(self.pipe) + except FileNotFoundError: + pass + os.mkfifo(self.pipe) + + while True: + logging.info(f"Waiting on pipe {self.pipe}") + self.read_task = asyncio.create_task(self._read_status()) + try: + status = await asyncio.wait_for(self.read_task, interval_secs) + logging.info(f"Got {len(status)} arguments from pipe {self.pipe}") + except asyncio.TimeoutError: + status = [] + logging.info(f"No arguments from {self.pipe}, timeout exceeded") + self.read_task = None + + logging.info(f"Calling {self.check.__name__}") + result = await self._call_check(status) + logging.info(f"Got {len(result)} alerts") + if ignore_first_run: + ignore_first_run = False + else: + await self._handle_alerts(result) + + async def graceful_stop(self) -> None: + logging.info("Cancelling pipe read task") + if self.read_task: + self.read_task.cancel() + async with aiofiles.open(self.pipe, "w") as p: + await p.write("") + try: + await self.read_task + except asyncio.CancelledError: + pass + logging.info("Removing pipe") + os.remove(self.pipe) + logging.info("Done!") diff --git a/src/lego_monitoring/core/const.py b/src/lego_monitoring/core/const.py index 0a84522..99724e4 100644 --- a/src/lego_monitoring/core/const.py +++ b/src/lego_monitoring/core/const.py @@ -1,2 +1,3 @@ VULNIX_PATH: str = ... # path to vulnix executable UPSC_PATH = "/usr/bin/upsc" +UPS_PIPE_NAME = "/tmp/lego-monitoring-ups-status" diff --git a/uv.lock b/uv.lock index 66dc199..f9daed7 100644 --- a/uv.lock +++ b/uv.lock @@ -14,6 +14,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f6/2c/711076e5f5d0707b8ec55a233c8bfb193e0981a800cd1b3b123e8ff61ca1/aiodns-3.5.0-py3-none-any.whl", hash = "sha256:6d0404f7d5215849233f6ee44854f2bb2481adf71b336b2279016ea5990ca5c5", size = 8068, upload-time = "2025-06-13T16:21:52.45Z" }, ] +[[package]] +name = "aiofiles" +version = "25.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/41/c3/534eac40372d8ee36ef40df62ec129bee4fdb5ad9706e58a29be53b2c970/aiofiles-25.1.0.tar.gz", hash = "sha256:a8d728f0a29de45dc521f18f07297428d56992a742f0cd2701ba86e44d23d5b2", size = 46354, upload-time = "2025-10-09T20:51:04.358Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bc/8a/340a1555ae33d7354dbca4faa54948d76d89a27ceef032c8c3bc661d003e/aiofiles-25.1.0-py3-none-any.whl", hash = "sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695", size = 14668, upload-time = "2025-10-09T20:51:03.174Z" }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -273,6 +282,7 @@ version = "1.1.1" source = { editable = "." } dependencies = [ { name = "aiodns" }, + { name = "aiofiles" }, { name = "aiohttp" }, { name = "alt-utils" }, { name = "humanize" }, @@ -284,6 +294,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "aiodns", specifier = ">=3.5.0" }, + { name = "aiofiles", specifier = ">=25.1.0" }, { name = "aiohttp", specifier = ">=3.12.15" }, { name = "alt-utils", specifier = ">=0.0.8" }, { name = "humanize", specifier = ">=4.12.3" },