Compare commits

...

14 commits

Author SHA1 Message Date
Alex Tau
ad1d956cc8 update version in pyproject 2026-01-18 16:03:38 +03:00
Alex Tau
8aa4c1d4da add ups event monitoring to description 2026-01-18 16:00:10 +03:00
Alex Tau
1c73e88564 Merge branch 'dev' 2026-01-18 15:36:22 +03:00
Alex Tau
5cd3f47d65 add docs for ups 2026-01-18 15:33:56 +03:00
Alex Tau
6c8ae03b6a stop if alert cannot be sent 2026-01-01 16:49:13 +03:00
Alex Tau
191839d30f add script for writing to the pipe 2025-12-19 18:26:11 +03:00
Alex Tau
10e79d6827 account for alert sending failing 2025-12-19 16:34:10 +03:00
Alex Tau
40e30529eb handle events sent to ups pipe 2025-12-19 15:48:01 +03:00
Alex Tau
58e47ae584 remove unused imports 2025-12-07 20:50:12 +03:00
Alex Tau
8c59bb2f31 update all flake references 2025-12-01 19:10:06 +03:00
Alex Tau
0a3923fbdd upgrade to nixos 25.11 2025-12-01 17:54:49 +03:00
Alex Tau
da480a7c4e add ups periodic checks 2025-09-13 14:40:02 +03:00
Alex Tau
2c6e804959 update nixpkgs for vulnix 1.12.0 2025-09-12 18:14:25 +03:00
Alex Tau
ce363c60ca clean up a bit 2025-09-02 14:14:06 +03:00
25 changed files with 1184 additions and 596 deletions

View file

@ -3,6 +3,7 @@
Simple system monitoring service. Sends alerts in Telegram and/or reports status to [Healthchecks](https://healthchecks.io/). Currently supports monitoring:
* CPU/RAM/network usage
* temperature readings
* UPS events
* [vulnix](https://github.com/nix-community/vulnix) readings (NixOS only)
## Setup
@ -65,3 +66,7 @@ Then enable and start the service:
ln -s /opt/lego-monitoring/lego-monitoring.service /etc/systemd/system/lego-monitoring.service
systemctl enable --now lego-monitoring
```
### UPS monitoring
See [docs/ups.md](docs/ups.md) for instructions.

View file

@ -36,12 +36,13 @@ List of enabled check sets\. Each check set is a module which checks something a
- ram alerts when RAM usage is above threshold
- temp alerts when temperature readings are above thresholds
- net alerts when network usage is above threshold
- ups alerts on UPS events
- vulnix periodically scans system for known CVEs, alerts if any are found (NixOS only)
*Type:*
list of (one of “self”, “remind”, “cpu”, “ram”, “temp”, “net”, “vulnix”)
list of (one of “self”, “remind”, “cpu”, “ram”, “temp”, “net”, “ups”, “vulnix”)
@ -635,6 +636,48 @@ null or (positive integer or floating point number, meaning >0)
## services\.lego-monitoring\.checks\.ups\.upsToCheck
List of UPSs to monitor, in ` upsc `-compatible format\. If null, all UPSs connected to localhost are checked\.
*Type:*
null or (list of string)
*Default:*
` null `
*Declared by:*
- [modules/options\.nix](../modules/options.nix)
## services\.lego-monitoring\.checks\.ups\.upsmonGroup
Group to allow to send UPS status updates\. This should usually include the user upsmon runs as\.
*Type:*
string
*Default:*
` config.power.ups.upsmon.user `
*Declared by:*
- [modules/options\.nix](../modules/options.nix)
## services\.lego-monitoring\.checks\.vulnix\.whitelist

85
docs/ups.md Normal file
View file

@ -0,0 +1,85 @@
# UPS monitoring
Both steps require configuring upsmon at least to the point of outputting UPS updates to upsmon's logs.
## NixOS
NOTIFYCMD is set automatically. Make sure to set NOTIFYFLAGs to include EXEC for events that are to be reported.
The following snippet enables all events to be reported to wall, system's log and lego-monitoring:
```nix
{
power.ups.upsmon.settings.NOTIFYFLAG = (map (ntype: [ntype "SYSLOG+WALL+EXEC"]) [
"ONLINE"
"ONBATT"
"LOWBATT"
"FSD"
"COMMOK"
"COMMBAD"
"SHUTDOWN"
"SHUTDOWN_HOSTSYNC"
"REPLBATT"
"NOCOMM"
"NOPARENT"
"CAL"
"NOTCAL"
"OFF"
"NOTOFF"
"BYPASS"
"NOTBYPASS"
"ECO"
"NOTECO"
"ALARM"
"NOTALARM"
"OVER"
"NOTOVER"
"TRIM"
"NOTTRIM"
"BOOST"
"NOTBOOST"
"OTHER"
"NOTOTHER"
"SUSPEND_STARTING"
"SUSPEND_FINISHED"
]);
}
```
## Non-NixOS
* NOTIFYCMD should be set to `/opt/lego-monitoring/.venv/bin/write-ups-status`.
* As above, NOTIFYFLAGs should include EXEC. Example for all events:
```
NOTIFYFLAG ONLINE SYSLOG+WALL+EXEC
NOTIFYFLAG ONBATT SYSLOG+WALL+EXEC
NOTIFYFLAG LOWBATT SYSLOG+WALL+EXEC
NOTIFYFLAG FSD SYSLOG+WALL+EXEC
NOTIFYFLAG COMMOK SYSLOG+WALL+EXEC
NOTIFYFLAG COMMBAD SYSLOG+WALL+EXEC
NOTIFYFLAG SHUTDOWN SYSLOG+WALL+EXEC
NOTIFYFLAG SHUTDOWN_HOSTSYNC SYSLOG+WALL+EXEC
NOTIFYFLAG REPLBATT SYSLOG+WALL+EXEC
NOTIFYFLAG NOCOMM SYSLOG+WALL+EXEC
NOTIFYFLAG NOPARENT SYSLOG+WALL+EXEC
NOTIFYFLAG CAL SYSLOG+WALL+EXEC
NOTIFYFLAG NOTCAL SYSLOG+WALL+EXEC
NOTIFYFLAG OFF SYSLOG+WALL+EXEC
NOTIFYFLAG NOTOFF SYSLOG+WALL+EXEC
NOTIFYFLAG BYPASS SYSLOG+WALL+EXEC
NOTIFYFLAG NOTBYPASS SYSLOG+WALL+EXEC
NOTIFYFLAG ECO SYSLOG+WALL+EXEC
NOTIFYFLAG NOTECO SYSLOG+WALL+EXEC
NOTIFYFLAG ALARM SYSLOG+WALL+EXEC
NOTIFYFLAG NOTALARM SYSLOG+WALL+EXEC
NOTIFYFLAG OVER SYSLOG+WALL+EXEC
NOTIFYFLAG NOTOVER SYSLOG+WALL+EXEC
NOTIFYFLAG TRIM SYSLOG+WALL+EXEC
NOTIFYFLAG NOTTRIM SYSLOG+WALL+EXEC
NOTIFYFLAG BOOST SYSLOG+WALL+EXEC
NOTIFYFLAG NOTBOOST SYSLOG+WALL+EXEC
NOTIFYFLAG OTHER SYSLOG+WALL+EXEC
NOTIFYFLAG NOTOTHER SYSLOG+WALL+EXEC
NOTIFYFLAG SUSPEND_STARTING SYSLOG+WALL+EXEC
NOTIFYFLAG SUSPEND_FINISHED SYSLOG+WALL+EXEC
```

43
flake.lock generated
View file

@ -2,36 +2,20 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1749086602,
"narHash": "sha256-DJcgJMekoxVesl9kKjfLPix2Nbr42i7cpEHJiTnBUwU=",
"lastModified": 1764522689,
"narHash": "sha256-SqUuBFjhl/kpDiVaKLQBoD8TLD+/cTUzzgVFoaHrkqY=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4792576cb003c994bd7cc1edada3129def20b27d",
"rev": "8bb5646e0bed5dbd3ab08c7a7cc15b75ab4e1d0f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-25.05",
"ref": "nixos-25.11",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs-29335f": {
"locked": {
"lastModified": 1745804731,
"narHash": "sha256-v/sK3AS0QKu/Tu5sHIfddiEHCvrbNYPv8X10Fpux68g=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "29335f23bea5e34228349ea739f31ee79e267b88",
"type": "github"
},
"original": {
"owner": "nixos",
"repo": "nixpkgs",
"rev": "29335f23bea5e34228349ea739f31ee79e267b88",
"type": "github"
}
},
"pyproject-build-systems": {
"inputs": {
"nixpkgs": [
@ -45,11 +29,11 @@
]
},
"locked": {
"lastModified": 1744599653,
"narHash": "sha256-nysSwVVjG4hKoOjhjvE6U5lIKA8sEr1d1QzEfZsannU=",
"lastModified": 1763662255,
"narHash": "sha256-4bocaOyLa3AfiS8KrWjZQYu+IAta05u3gYZzZ6zXbT0=",
"owner": "pyproject-nix",
"repo": "build-system-pkgs",
"rev": "7dba6dbc73120e15b558754c26024f6c93015dd7",
"rev": "042904167604c681a090c07eb6967b4dd4dae88c",
"type": "github"
},
"original": {
@ -65,11 +49,11 @@
]
},
"locked": {
"lastModified": 1743438845,
"narHash": "sha256-1GSaoubGtvsLRwoYwHjeKYq40tLwvuFFVhGrG8J9Oek=",
"lastModified": 1764134915,
"narHash": "sha256-xaKvtPx6YAnA3HQVp5LwyYG1MaN4LLehpQI8xEdBvBY=",
"owner": "pyproject-nix",
"repo": "pyproject.nix",
"rev": "8063ec98edc459571d042a640b1c5e334ecfca1e",
"rev": "2c8df1383b32e5443c921f61224b198a2282a657",
"type": "github"
},
"original": {
@ -81,7 +65,6 @@
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"nixpkgs-29335f": "nixpkgs-29335f",
"pyproject-build-systems": "pyproject-build-systems",
"pyproject-nix": "pyproject-nix",
"uv2nix": "uv2nix"
@ -97,11 +80,11 @@
]
},
"locked": {
"lastModified": 1749170547,
"narHash": "sha256-zOptuFhTr9P0A+unFaOBFx5E5T6yx0qE8VrUGVrM96U=",
"lastModified": 1764546642,
"narHash": "sha256-pCzgOjGEZyH7xKmpckdJzWyO0kvTIlaTK+ed/wguv5Y=",
"owner": "pyproject-nix",
"repo": "uv2nix",
"rev": "7ae60727d4fc2e41aefd30da665e4f92ba8298f1",
"rev": "0c56de7543459a23d0ebb7977fd555ced5d842ae",
"type": "github"
},
"original": {

View file

@ -1,8 +1,6 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.05";
# this is for uv 0.6.17, 0.7.0 has a change uv2nix doesn't yet support: https://github.com/astral-sh/uv/pull/13176
nixpkgs-29335f.url = "github:nixos/nixpkgs/29335f23bea5e34228349ea739f31ee79e267b88";
nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.11";
pyproject-nix = {
url = "github:pyproject-nix/pyproject.nix";
@ -27,7 +25,6 @@
{
self,
nixpkgs,
nixpkgs-29335f,
uv2nix,
pyproject-nix,
pyproject-build-systems,
@ -73,7 +70,8 @@
old: {
postPatch = ''
substituteInPlace src/lego_monitoring/core/const.py \
--replace-fail 'VULNIX_PATH: str = ...' 'VULNIX_PATH = "${lib.getExe pkgs.vulnix}"'
--replace-fail 'VULNIX_PATH: str = ...' 'VULNIX_PATH = "${lib.getExe pkgs.vulnix}"' \
--replace-fail 'UPSC_PATH = "/usr/bin/upsc"' 'UPSC_PATH = "${pkgs.nut}/bin/upsc"'
'';
}
);
@ -81,7 +79,6 @@
# This example is only using x86_64-linux
pkgs = nixpkgs.legacyPackages.x86_64-linux;
pkgs-29335f = nixpkgs-29335f.legacyPackages.x86_64-linux;
# Use Python 3.12 from nixpkgs
python = pkgs.python312;
@ -129,7 +126,7 @@
impure = pkgs.mkShell {
packages = [
python
pkgs-29335f.uv
pkgs.uv
];
env =
{
@ -209,7 +206,7 @@
pkgs.mkShell {
packages = [
virtualenv
pkgs-29335f.uv
pkgs.uv
];
env = {

View file

@ -76,6 +76,11 @@ package:
warning_threshold_comb_bytes = interfaceCfg.warningThresholdCombBytes;
critical_threshold_comb_bytes = interfaceCfg.criticalThresholdCombBytes;
}) cfg.checks.net.interfaces;
ups = with cfg.checks.ups; {
ups_to_check = upsToCheck;
upsmon_group = upsmonGroup;
};
};
};
in lib.mkIf cfg.enable {
@ -93,5 +98,8 @@ package:
StartLimitBurst = 3;
};
};
power.ups.upsmon.settings = lib.mkIf (builtins.elem "ups" cfg.enabledCheckSets) {
NOTIFYCMD = "${package}/bin/write-ups-status";
};
};
}

View file

@ -1,5 +1,6 @@
{
lib,
config,
...
}:
@ -33,6 +34,7 @@ in
"ram"
"temp"
"net"
"ups"
"vulnix"
]);
@ -45,6 +47,7 @@ in
* ram -- alerts when RAM usage is above threshold
* temp -- alerts when temperature readings are above thresholds
* net -- alerts when network usage is above threshold
* ups -- alerts on UPS events
* vulnix -- periodically scans system for known CVEs, alerts if any are found (NixOS only)'';
};
@ -170,6 +173,20 @@ in
}'';
};
};
ups = {
upsToCheck = lib.mkOption {
type = with lib.types; nullOr (listOf str);
default = null;
description = "List of UPS's to monitor, in `upsc`-compatible format. If null, all UPS's connected to localhost are checked.";
};
upsmonGroup = lib.mkOption {
type = lib.types.str;
default = config.power.ups.upsmon.user;
defaultText = lib.literalExpression "config.power.ups.upsmon.user";
description = "Group to allow to send UPS status updates. This should usually include the user upsmon runs as.";
};
};
};
};
}

View file

@ -1,21 +1,25 @@
[project]
name = "lego-monitoring"
version = "1.1.1"
version = "1.2.0"
description = "Monitoring software for the lego server"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"aiodns>=3.5.0",
"aiofiles>=25.1.0",
"aiohttp>=3.12.15",
"alt-utils>=0.0.8",
"humanize>=4.12.3",
"psutil>=7.0.0",
"returns>=0.26.0",
"telethon>=1.40.0",
"tenacity>=9.1.2",
"uplink[aiohttp]>=0.10.0",
]
[project.scripts]
lego-monitoring = "lego_monitoring:main"
write-ups-status = "lego_monitoring:write_ups_status"
[build-system]
requires = ["hatchling"]

View file

@ -1,136 +1,2 @@
import argparse
import asyncio
import datetime
import logging
import signal
from typing import Coroutine
from . import checks
from .alerting import sender
from .alerting.commands import CommandHandlerManager
from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import BaseChecker, IntervalChecker, ScheduledChecker
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
def main() -> None:
asyncio.run(async_main())
async def async_main():
parser = argparse.ArgumentParser(
prog="lego-monitoring",
description="Lego-monitoring service",
)
parser.add_argument("-c", "--config", help="config file")
parser.add_argument("--print-temp", help="print temp sensor readings and exit", action="store_true")
args = parser.parse_args()
if args.config:
config_path = parser.parse_args().config
config = load_config(config_path)
cvars.config.set(config)
if args.print_temp:
print_readings()
raise SystemExit
if not args.config:
raise RuntimeError("--config must be specified in standard operating mode")
logging.basicConfig(level=config.log_level)
check_sets = config_enums.CheckSet
checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = {
check_sets.SELF: [
sender.send_alert(checks.generate_start_alert()),
IntervalChecker(checks.self_check, interval=datetime.timedelta(minutes=5), persistent=False),
],
check_sets.CPU: [
IntervalChecker(
checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True, ignore_first_run=True
)
],
check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)],
check_sets.TEMP: [IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True)],
check_sets.VULNIX: [
IntervalChecker(
checks.vulnix_check,
interval=datetime.timedelta(days=3),
persistent=True,
send_any_state=True,
# As those are checked less often than daily, reminds could lead to awkward situations
# when the vuln is fixed but you still get reminders about it for 2 more days.
remind=False,
)
],
check_sets.REMIND: [
ScheduledChecker(
checks.remind_check,
period=datetime.timedelta(days=1),
when=datetime.time(hour=0, minute=0),
persistent=False,
is_reminder=True,
)
],
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
}
checkers = []
for enabled_set in config.enabled_check_sets:
for checker in checker_sets[enabled_set]:
checkers.append(checker)
checker_sets[check_sets.REMIND][0].check_args = [checkers]
if config.alert_channels.telegram is not None:
tg_client = await sender.get_tg_client()
my_username = (await tg_client.get_me()).username
logging.info(f"Logged in as @{my_username}")
command_manager = CommandHandlerManager(checkers)
await command_manager.attach_handlers(tg_client)
else:
logging.info("Telegram integration is disabled")
tg_client = None
cvars.tg_client.set(tg_client)
if config.alert_channels.healthchecks is not None:
healthchecks_client = sender.get_healthchecks_client()
logging.info("Ready to send pings to healthchecks")
cvars.healthchecks_client.set(healthchecks_client)
else:
logging.info("Healthchecks integration is disabled")
signal.signal(signal.SIGTERM, stop_gracefully)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
if isinstance(c, BaseChecker):
c = c.run_checker()
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
if "self" in config.enabled_check_sets:
alert = checks.generate_stop_alert()
await sender.send_alert(alert)
await sender.send_healthchecks_status(alert)
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)
from .checks import write_ups_status
from .main import main

View file

@ -11,13 +11,13 @@ class AlertType(StrEnum):
NET = "NET"
RAM = "RAM"
TEMP = "TEMP"
UPS = "UPS"
VULN = "VULN"
# LOGIN = "LOGIN"
# SMART = "SMART" # TODO
# RAID = "RAID"
# DISKS = "DISKS"
# UPS = "UPS"
# UPDATE = "UPDATE"

View file

@ -1,15 +1,16 @@
import logging
from socket import gethostname
import tenacity
from returns.result import Failure, Success
from telethon import TelegramClient
from telethon.sessions import MemorySession
from uplink import AiohttpClient
from ..checks.utils import format_for_healthchecks_slug
from ..core import cvars
from ..core.error_handling import log_errors_async
from .alert import Alert
from .clients.healthchecks import HealthchecksClient
from .enum import SEVERITY_TO_EMOJI, AlertType, Severity
from .enum import SEVERITY_TO_EMOJI, Severity
async def get_tg_client() -> TelegramClient:
@ -23,9 +24,7 @@ async def get_tg_client() -> TelegramClient:
def get_healthchecks_client() -> HealthchecksClient:
config = cvars.config.get()
base_url = config.alert_channels.healthchecks.pinging_api_endpoint
client = HealthchecksClient(
base_url=config.alert_channels.healthchecks.pinging_api_endpoint, client=AiohttpClient()
)
client = HealthchecksClient(base_url=base_url, client=AiohttpClient())
return client
@ -40,27 +39,26 @@ def format_message(alert: Alert, note: str) -> str:
return message
async def send_alert(alert: Alert, note: str = "") -> None:
async def send_alert(alert: Alert, note: str = "") -> Success[None] | Failure[tenacity.RetryError]:
return await log_errors_async(_send_alert(alert, note))
async def send_healthchecks_status(alert: Alert) -> Success[None] | Failure[tenacity.RetryError]:
return await log_errors_async(_send_healthchecks_status(alert))
@tenacity.retry(wait=tenacity.wait_random_exponential(multiplier=1, max=60), stop=tenacity.stop_after_attempt(3))
async def _send_alert(alert: Alert, note: str = "") -> None:
logging.debug(f"Sending {alert.alert_type} alert to Telegram")
try:
tg_client = cvars.tg_client.get()
except LookupError: # being called standalone
# cvars.config.set(get_config())
# temp_client = True
# client = await get_tg_client()
# cvars.matrix_client.set(client)
raise NotImplementedError # TODO
else:
... # temp_client = False
if tg_client is not None:
room_id = cvars.config.get().alert_channels.telegram.room_id
message = format_message(alert, note)
await tg_client.send_message(entity=room_id, message=message)
# if temp_client:
# await client.close()
async def send_healthchecks_status(alert: Alert) -> None:
@tenacity.retry(wait=tenacity.wait_random_exponential(multiplier=1, max=60), stop=tenacity.stop_after_attempt(3))
async def _send_healthchecks_status(alert: Alert) -> None:
def get_pinging_key(keys: dict[str, str]):
if alert.healthchecks_slug in keys:
return keys[alert.healthchecks_slug]

View file

@ -4,4 +4,6 @@ from .ram import ram_check
from .remind import remind_check
from .self import generate_start_alert, generate_stop_alert, self_check
from .temp import temp_check
from .ups.check import UPSTracker
from .ups.notifycmd import write_ups_status
from .vulnix import vulnix_check

View file

@ -0,0 +1,187 @@
import logging
import subprocess
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import StrEnum
from socket import gethostname
from typing import Optional
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.config.checks.ups import UPSCheckConfig
from lego_monitoring.core import cvars
from lego_monitoring.core.const import UPSC_PATH
from ..utils import format_for_healthchecks_slug
from .events import UPSEvent, UPSEventType
class UPSStatus(StrEnum):
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
ON_LINE = "OL"
ON_BATTERY = "OB"
BATTERY_LOW = "LB"
BATTERY_HIGH = "HB"
BATTERY_REPLACE = "RB"
BATTERY_CHARGING = "CHRG"
BATTERY_DISCHARGING = "DISCHRG"
UPS_BYPASS = "BYPASS"
UPS_OFFLINE = "OFF"
UPS_OVERLOAD = "OVER"
UPS_CALIBRATION = "CAL"
UPS_TRIM = "TRIM"
UPS_BOOST = "BOOST"
UPS_FSD = "FSD"
ALARM = "ALARM"
WAIT = "WAIT"
@dataclass
class UPS:
name: str
ups_status: Optional[list[UPSStatus]] = None
latest_events: list[UPSEventType] = field(default_factory=list)
latest_event_time: Optional[datetime] = None
battery_charge_percentage: Optional[int] = None
battery_warning_percentage: Optional[int] = None
battery_critical_percentage: Optional[int] = None
battery_runtime: Optional[int] = None
def __str__(self):
return f"""Name: {self.name}
Latest events: {f"{', '.join(self.latest_events)} @ {self.latest_event_time.isoformat()}" if len(self.latest_events) > 0 else 'no events recorded'}
Status: {' '.join(self.ups_status) if self.ups_status is not None else '?'}
Battery: {self.battery_charge_percentage if self.battery_charge_percentage is not None else '?'}%
Remaining runtime: {timedelta(seconds=self.battery_runtime) if self.battery_runtime is not None else '?'}
Will warn at {self.battery_warning_percentage if self.battery_warning_percentage is not None else '?'}%
Will shut down at {self.battery_critical_percentage if self.battery_critical_percentage is not None else '?'}%
"""
def get_ups_list() -> list[str]:
run_results = subprocess.run([UPSC_PATH, "-l"], stdout=subprocess.PIPE, encoding="utf-8")
return run_results.stdout.splitlines()
@dataclass
class UPSTracker:
upses: dict[str, UPS] = field(default_factory=dict)
config: UPSCheckConfig = None
def __post_init__(self):
self.config = cvars.config.get().checks.ups
def ups_check(self, ups_events_raw: list[dict]) -> list[Alert]:
ups_events: dict[str, list[UPSEvent]] = {}
for d in ups_events_raw:
event = UPSEvent(**d)
if event.ups_name not in ups_events:
ups_events[event.ups_name] = [event]
else:
ups_events[event.ups_name].append(event)
if self.config.ups_to_check is None:
ups_list = get_ups_list()
else:
ups_list = self.config.ups_to_check
alerts = []
for ups_name in ups_list:
if ups_name not in self.upses:
ups = get_ups_stats(ups_name)
else:
ups = get_ups_stats(self.upses[ups_name])
self.upses[ups_name] = ups
slug = f"{format_for_healthchecks_slug(gethostname())}-ups-{format_for_healthchecks_slug(ups_name)}"
severity = Severity.OK
reasons_for_severity = set()
if ups_name in ups_events:
ups.latest_event_time = datetime.now()
ups.latest_events = []
for event in ups_events[ups_name]:
ups.latest_events.append(event.type_)
match event.type_:
case UPSEventType.FSD:
severity = Severity.CRITICAL
reasons_for_severity.add("Forced shutdown")
case UPSEventType.ALARM:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Alarm triggered")
for event in ups.latest_events:
match event:
case UPSEventType.COMMBAD:
severity = Severity.CRITICAL
reasons_for_severity.add("Communication lost")
case UPSEventType.SHUTDOWN:
severity = Severity.CRITICAL
reasons_for_severity.add("Shutting down now")
case UPSEventType.SHUTDOWN_HOSTSYNC:
severity = Severity.CRITICAL
reasons_for_severity.add("Shutdown initiated (waiting for secondaries)")
case UPSEventType.NOCOMM:
severity = Severity.CRITICAL
reasons_for_severity.add("Cannot establish communication")
if ups.battery_charge_percentage < ups.battery_critical_percentage:
severity = Severity.CRITICAL
reasons_for_severity.add("Critical percentage reached")
elif ups.battery_charge_percentage < ups.battery_critical_percentage:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Warning percentage reached")
for status in ups.ups_status:
match status:
case UPSStatus.UPS_OVERLOAD:
severity = Severity.CRITICAL
reasons_for_severity.add("UPS is overloaded")
case UPSStatus.ON_BATTERY:
severity = max(Severity.WARNING, severity)
reasons_for_severity.add("UPS is on battery")
case UPSStatus.WAIT:
severity = max(Severity.INFO, severity)
reasons_for_severity.add("Waiting for info from UPS driver")
case UPSStatus.UPS_FSD:
severity = Severity.CRITICAL
reasons_for_severity.add("Forced shutdown")
case UPSStatus.ALARM:
severity = max(severity, Severity.WARNING)
reasons_for_severity.add("Alarm triggered")
if len(reasons_for_severity) > 0:
message = f"NOTE: {', '.join(reasons_for_severity)}\n{ups}"
else:
message = str(ups)
alerts.append(Alert(alert_type=AlertType.UPS, message=message, severity=severity, healthchecks_slug=slug))
return alerts
def get_ups_stats(ups_or_name: str | UPS) -> UPS:
if isinstance(ups_or_name, UPS):
ups = ups_or_name
else:
ups = UPS(name=ups_or_name)
run_results = subprocess.run([UPSC_PATH, ups.name], stdout=subprocess.PIPE, encoding="utf-8")
for line in run_results.stdout.splitlines():
variable, value = line.split(": ")[:2]
match variable:
case "battery.charge":
ups.battery_charge_percentage = int(value)
case "battery.charge.low":
ups.battery_critical_percentage = int(value)
case "battery.charge.warning":
ups.battery_warning_percentage = int(value)
case "battery.runtime":
ups.battery_runtime = int(value)
case "ups.status":
ups.ups_status = [UPSStatus(status) for status in value.split()]
case _:
...
return ups

View file

@ -0,0 +1,45 @@
from dataclasses import dataclass
from enum import StrEnum
class UPSEventType(StrEnum):
"""https://networkupstools.org/docs/man/upsmon.html#_notify_events"""
ONLINE = "ONLINE"
ONBATT = "ONBATT"
LOWBATT = "LOWBATT"
FSD = "FSD"
COMMOK = "COMMOK"
COMMBAD = "COMMBAD"
SHUTDOWN = "SHUTDOWN"
SHUTDOWN_HOSTSYNC = "SHUTDOWN_HOSTSYNC"
REPLBATT = "REPLBATT"
NOCOMM = "NOCOMM"
NOPARENT = "NOPARENT"
CAL = "CAL"
NOTCAL = "NOTCAL"
OFF = "OFF"
NOTOFF = "NOTOFF"
BYPASS = "BYPASS"
NOTBYPASS = "NOTBYPASS"
ECO = "ECO"
NOTECO = "NOTECO"
ALARM = "ALARM"
NOTALARM = "NOTALARM"
OVER = "OVER"
NOTOVER = "NOTOVER"
TRIM = "TRIM"
NOTTRIM = "NOTTRIM"
BOOST = "BOOST"
NOTBOOST = "NOTBOOST"
OTHER = "OTHER"
NOTOTHER = "NOTOTHER"
SUSPEND_STARTING = "SUSPEND_STARTING"
SUSPEND_FINISHED = "SUSPEND_FINISHED"
@dataclass
class UPSEvent:
type_: UPSEventType
message: str
ups_name: str

View file

@ -0,0 +1,27 @@
import json
import os
import sys
from dataclasses import asdict
from lego_monitoring.core.const import UPS_PIPE_NAME
from lego_monitoring.core.fifo import pipe_exists
from .events import UPSEvent, UPSEventType
def write_ups_status():
if not pipe_exists(UPS_PIPE_NAME):
raise Exception("lego-monitoring not running!")
notifytype = os.environ["NOTIFYTYPE"]
if notifytype not in UPSEventType:
notifytype = UPSEventType.OTHER
upsname = os.environ["UPSNAME"]
message = sys.argv[1]
event = UPSEvent(type_=notifytype, message=message, ups_name=upsname)
event_s = json.dumps(asdict(event)) + "\n"
with open(UPS_PIPE_NAME, "a") as p:
p.write(event_s)

View file

@ -10,6 +10,7 @@ from .checks.cpu import CpuCheckConfig
from .checks.net import NetCheckConfig
from .checks.ram import RamCheckConfig
from .checks.temp import TempCheckConfig
from .checks.ups import UPSCheckConfig
from .checks.vulnix import VulnixCheckConfig
@ -20,6 +21,7 @@ class ChecksConfig(NestedDeserializableDataclass):
temp: TempCheckConfig = field(default_factory=TempCheckConfig)
vulnix: Optional[VulnixCheckConfig] = None # vulnix check WILL raise if this config section is None
net: NetCheckConfig = field(default_factory=NetCheckConfig)
ups: UPSCheckConfig = field(default_factory=UPSCheckConfig)
@dataclass

View file

@ -1,4 +1,4 @@
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Optional
from alt_utils import NestedDeserializableDataclass

View file

@ -0,0 +1,8 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class UPSCheckConfig:
upsmon_group: str = "nutmon"
ups_to_check: Optional[list] = None

View file

@ -9,6 +9,7 @@ class CheckSet(StrEnum):
RAM = "ram"
TEMP = "temp"
NET = "net"
UPS = "ups"
VULNIX = "vulnix"

View file

@ -1,8 +1,13 @@
import asyncio
import datetime
import json
import logging
import os
import shutil
from dataclasses import KW_ONLY, dataclass, field
from typing import Any, Callable, Coroutine
from typing import Any, Callable, Coroutine, Optional
import aiofiles
from ..alerting.alert import Alert
from ..alerting.current import CurrentAlerts
@ -10,7 +15,7 @@ from ..alerting.enum import Severity
from ..alerting.sender import send_alert, send_healthchecks_status
@dataclass
@dataclass(repr=False)
class BaseChecker:
check: Callable | Coroutine
@ -51,9 +56,12 @@ class BaseChecker:
check_kwargs: dict[str, Any] = field(default_factory=dict)
current_alerts: CurrentAlerts = field(default_factory=CurrentAlerts, init=False)
async def _call_check(self) -> list[Alert]:
def __repr__(self):
return f"<{type(self).__name__}(check={self.check})>"
async def _call_check(self, *extra_args, **extra_kwargs) -> list[Alert]:
if isinstance(self.check, Callable):
result = self.check(*self.check_args, **self.check_kwargs)
result = self.check(*self.check_args, *extra_args, **self.check_kwargs, **extra_kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(self.check, Coroutine):
@ -63,28 +71,31 @@ class BaseChecker:
return result
async def _handle_alerts(self, alerts: list[Alert]) -> None:
async with asyncio.TaskGroup() as tg:
if not self.is_reminder:
for alert in alerts:
await send_healthchecks_status(alert)
tg.create_task(send_healthchecks_status(alert))
if not self.persistent:
for alert in alerts:
if alert.severity != Severity.OK:
await send_alert(alert, "ongoing" if self.is_reminder else "")
tg.create_task(send_alert(alert, "ongoing" if self.is_reminder else ""))
return
old_severity, new_severity = self.current_alerts.update(alerts)
if (old_severity != new_severity or self.send_any_state) and not (
old_severity == None and new_severity == Severity.OK
):
for alert in alerts:
await send_alert(alert, note="ongoing")
tg.create_task(send_alert(alert, note="ongoing"))
async def run_checker(self) -> None:
raise NotImplementedError
@dataclass
@dataclass(repr=False)
class IntervalChecker(BaseChecker):
"Checker that calls the check each interval"
_: KW_ONLY
interval: datetime.timedelta
ignore_first_run: bool = False
@ -103,8 +114,10 @@ class IntervalChecker(BaseChecker):
await asyncio.sleep(interval_secs)
@dataclass
@dataclass(repr=False)
class ScheduledChecker(BaseChecker):
"Checker that calls the check each period (usually a day) at the specified time"
_: KW_ONLY
period: datetime.timedelta
when: datetime.time
@ -128,3 +141,69 @@ class ScheduledChecker(BaseChecker):
await self._handle_alerts(result)
case _:
raise NotImplementedError
@dataclass(repr=False)
class PipeIntervalChecker(IntervalChecker):
"""
Checker that watches the specified pipe and calls the check if something arrives.
The check is guaranteed to be called at least once per interval, with empty argument list if nothing arrives
"""
_: KW_ONLY
pipe: str
owner_user: Optional[str] = None
owner_group: Optional[str] = None
read_task: Optional[asyncio.Task] = None
async def _read_status(self) -> list:
async with aiofiles.open(self.pipe, "r") as p:
return [json.loads(line.rstrip()) async for line in p]
# await asyncio.sleep(60)
# return []
async def run_checker(self) -> None:
interval_secs = self.interval.total_seconds()
ignore_first_run = self.ignore_first_run
try:
os.remove(self.pipe)
except FileNotFoundError:
pass
os.mkfifo(self.pipe)
if self.owner_user is not None or self.owner_group is not None:
shutil.chown(self.pipe, self.owner_user, self.owner_group)
os.chmod(self.pipe, 0o660)
while True:
logging.info(f"Waiting on pipe {self.pipe}")
self.read_task = asyncio.create_task(self._read_status())
try:
status = await asyncio.wait_for(self.read_task, interval_secs)
logging.info(f"Got {len(status)} arguments from pipe {self.pipe}")
except asyncio.TimeoutError:
status = []
logging.info(f"No arguments from {self.pipe}, timeout exceeded")
self.read_task = None
logging.info(f"Calling {self.check.__name__}")
result = await self._call_check(status)
logging.info(f"Got {len(result)} alerts")
if ignore_first_run:
ignore_first_run = False
else:
await self._handle_alerts(result)
async def graceful_stop(self) -> None:
logging.info("Cancelling pipe read task")
if self.read_task:
self.read_task.cancel()
async with aiofiles.open(self.pipe, "w") as p:
await p.write("")
try:
await self.read_task
except asyncio.CancelledError:
pass
logging.info("Removing pipe")
os.remove(self.pipe)
logging.info("Done!")

View file

@ -1 +1,3 @@
VULNIX_PATH: str = ... # path to vulnix executable
UPSC_PATH = "/usr/bin/upsc"
UPS_PIPE_NAME = "/tmp/lego-monitoring-ups-status"

View file

@ -0,0 +1,23 @@
import logging
import traceback
from typing import Awaitable, Callable, TypeVar
from returns.result import Failure, Success
T = TypeVar("T")
def log_errors(function: Callable[..., T], *args, **kwargs) -> Success[T] | Failure[Exception]:
try:
return Success(function(args, kwargs))
except Exception as e:
logging.error(traceback.format_exc())
return Failure(e)
async def log_errors_async(awaitable: Awaitable[T]) -> Success[T] | Failure[Exception]:
try:
return Success(await awaitable)
except Exception as e:
logging.error(traceback.format_exc())
return Failure(e)

View file

@ -0,0 +1,11 @@
import os
import stat
def pipe_exists(path: str) -> bool:
try:
if stat.S_ISFIFO(os.stat(path).st_mode) == 0:
return False
return True
except FileNotFoundError:
return False

158
src/lego_monitoring/main.py Normal file
View file

@ -0,0 +1,158 @@
import argparse
import asyncio
import datetime
import logging
import signal
from typing import Coroutine
from . import checks
from .alerting import sender
from .alerting.commands import CommandHandlerManager
from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import (
BaseChecker,
IntervalChecker,
PipeIntervalChecker,
ScheduledChecker,
)
from .core.const import UPS_PIPE_NAME
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
def main() -> None:
asyncio.run(async_main())
async def async_main():
parser = argparse.ArgumentParser(
prog="lego-monitoring",
description="Lego-monitoring service",
)
parser.add_argument("-c", "--config", help="config file")
parser.add_argument("--print-temp", help="print temp sensor readings and exit", action="store_true")
args = parser.parse_args()
if args.config:
config_path = parser.parse_args().config
config = load_config(config_path)
cvars.config.set(config)
if args.print_temp:
print_readings()
raise SystemExit
if not args.config:
raise RuntimeError("--config must be specified in standard operating mode")
logging.basicConfig(level=config.log_level)
check_sets = config_enums.CheckSet
checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = {
check_sets.SELF: [
sender.send_alert(checks.generate_start_alert()),
IntervalChecker(checks.self_check, interval=datetime.timedelta(minutes=5), persistent=False),
],
check_sets.CPU: [
IntervalChecker(
checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True, ignore_first_run=True
)
],
check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)],
check_sets.TEMP: [IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True)],
check_sets.VULNIX: [
IntervalChecker(
checks.vulnix_check,
interval=datetime.timedelta(days=3),
persistent=True,
send_any_state=True,
# As those are checked less often than daily, reminds could lead to awkward situations
# when the vuln is fixed but you still get reminders about it for 2 more days.
remind=False,
)
],
check_sets.REMIND: [
ScheduledChecker(
checks.remind_check,
period=datetime.timedelta(days=1),
when=datetime.time(hour=0, minute=0),
persistent=False,
is_reminder=True,
)
],
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
check_sets.UPS: [
PipeIntervalChecker(
checks.UPSTracker().ups_check,
interval=datetime.timedelta(minutes=5),
persistent=True,
pipe=UPS_PIPE_NAME,
owner_group=config.checks.ups.upsmon_group,
)
],
}
checkers = []
for enabled_set in config.enabled_check_sets:
for checker in checker_sets[enabled_set]:
checkers.append(checker)
checker_sets[check_sets.REMIND][0].check_args = [checkers]
if config.alert_channels.telegram is not None:
tg_client = await sender.get_tg_client()
my_username = (await tg_client.get_me()).username
logging.info(f"Logged in as @{my_username}")
command_manager = CommandHandlerManager(checkers)
await command_manager.attach_handlers(tg_client)
else:
logging.info("Telegram integration is disabled")
tg_client = None
cvars.tg_client.set(tg_client)
if config.alert_channels.healthchecks is not None:
healthchecks_client = sender.get_healthchecks_client()
logging.info("Ready to send pings to healthchecks")
cvars.healthchecks_client.set(healthchecks_client)
else:
logging.info("Healthchecks integration is disabled")
signal.signal(signal.SIGTERM, stop_gracefully)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
if isinstance(c, BaseChecker):
c = c.run_checker()
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
if "self" in config.enabled_check_sets:
alert = checks.generate_stop_alert()
async with asyncio.TaskGroup() as tg:
tg.create_task(sender.send_alert(alert))
tg.create_task(sender.send_healthchecks_status(alert))
for c in checkers:
try:
await c.graceful_stop()
except AttributeError:
continue
if tg_client:
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)

793
uv.lock generated

File diff suppressed because it is too large Load diff