add script for writing to the pipe

This commit is contained in:
Alex Tau 2025-12-19 18:26:11 +03:00
parent 10e79d6827
commit 191839d30f
12 changed files with 223 additions and 158 deletions

View file

@ -79,6 +79,7 @@ package:
ups = with cfg.checks.ups; {
ups_to_check = upsToCheck;
upsmon_group = upsmonGroup;
};
};
};
@ -97,5 +98,8 @@ package:
StartLimitBurst = 3;
};
};
power.ups.upsmon.settings = lib.mkIf (builtins.elem "ups" cfg.enabledCheckSets) {
NOTIFYCMD = "${package}/bin/write-ups-status";
};
};
}

View file

@ -1,5 +1,6 @@
{
lib,
config,
...
}:
@ -179,6 +180,11 @@ in
default = null;
description = "List of UPS's to monitor, in `upsc`-compatible format. If null, all UPS's connected to localhost are checked.";
};
upsmonGroup = lib.mkOption {
type = lib.types.str;
default = config.power.ups.upsmon.user;
description = "Group to allow to send UPS status updates. This should usually include the user upsmon runs as.";
};
};
};
};

View file

@ -19,6 +19,7 @@ dependencies = [
[project.scripts]
lego-monitoring = "lego_monitoring:main"
write-ups-status = "lego_monitoring:write_ups_status"
[build-system]
requires = ["hatchling"]

View file

@ -1,157 +1,2 @@
import argparse
import asyncio
import datetime
import logging
import signal
from typing import Coroutine
from . import checks
from .alerting import sender
from .alerting.commands import CommandHandlerManager
from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import (
BaseChecker,
IntervalChecker,
PipeIntervalChecker,
ScheduledChecker,
)
from .core.const import UPS_PIPE_NAME
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
def main() -> None:
asyncio.run(async_main())
async def async_main():
parser = argparse.ArgumentParser(
prog="lego-monitoring",
description="Lego-monitoring service",
)
parser.add_argument("-c", "--config", help="config file")
parser.add_argument("--print-temp", help="print temp sensor readings and exit", action="store_true")
args = parser.parse_args()
if args.config:
config_path = parser.parse_args().config
config = load_config(config_path)
cvars.config.set(config)
if args.print_temp:
print_readings()
raise SystemExit
if not args.config:
raise RuntimeError("--config must be specified in standard operating mode")
logging.basicConfig(level=config.log_level)
check_sets = config_enums.CheckSet
checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = {
check_sets.SELF: [
sender.send_alert(checks.generate_start_alert()),
IntervalChecker(checks.self_check, interval=datetime.timedelta(minutes=5), persistent=False),
],
check_sets.CPU: [
IntervalChecker(
checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True, ignore_first_run=True
)
],
check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)],
check_sets.TEMP: [IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True)],
check_sets.VULNIX: [
IntervalChecker(
checks.vulnix_check,
interval=datetime.timedelta(days=3),
persistent=True,
send_any_state=True,
# As those are checked less often than daily, reminds could lead to awkward situations
# when the vuln is fixed but you still get reminders about it for 2 more days.
remind=False,
)
],
check_sets.REMIND: [
ScheduledChecker(
checks.remind_check,
period=datetime.timedelta(days=1),
when=datetime.time(hour=0, minute=0),
persistent=False,
is_reminder=True,
)
],
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
check_sets.UPS: [
PipeIntervalChecker(
checks.UPSTracker().ups_check,
interval=datetime.timedelta(minutes=5),
persistent=True,
pipe=UPS_PIPE_NAME,
)
],
}
checkers = []
for enabled_set in config.enabled_check_sets:
for checker in checker_sets[enabled_set]:
checkers.append(checker)
checker_sets[check_sets.REMIND][0].check_args = [checkers]
if config.alert_channels.telegram is not None:
tg_client = await sender.get_tg_client()
my_username = (await tg_client.get_me()).username
logging.info(f"Logged in as @{my_username}")
command_manager = CommandHandlerManager(checkers)
await command_manager.attach_handlers(tg_client)
else:
logging.info("Telegram integration is disabled")
tg_client = None
cvars.tg_client.set(tg_client)
if config.alert_channels.healthchecks is not None:
healthchecks_client = sender.get_healthchecks_client()
logging.info("Ready to send pings to healthchecks")
cvars.healthchecks_client.set(healthchecks_client)
else:
logging.info("Healthchecks integration is disabled")
signal.signal(signal.SIGTERM, stop_gracefully)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
if isinstance(c, BaseChecker):
c = c.run_checker()
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
if "self" in config.enabled_check_sets:
alert = checks.generate_stop_alert()
async with asyncio.TaskGroup() as tg:
tg.create_task(sender.send_alert(alert))
tg.create_task(sender.send_healthchecks_status(alert))
for c in checkers:
try:
await c.graceful_stop()
except AttributeError:
continue
if tg_client:
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)
from .checks import write_ups_status
from .main import main

View file

@ -5,4 +5,5 @@ from .remind import remind_check
from .self import generate_start_alert, generate_stop_alert, self_check
from .temp import temp_check
from .ups.check import UPSTracker
from .ups.notifycmd import write_ups_status
from .vulnix import vulnix_check

View file

@ -121,6 +121,9 @@ class UPSTracker:
case UPSEventType.SHUTDOWN:
severity = Severity.CRITICAL
reasons_for_severity.add("Shutting down now")
case UPSEventType.SHUTDOWN_HOSTSYNC:
severity = Severity.CRITICAL
reasons_for_severity.add("Shutdown initiated (waiting for secondaries)")
case UPSEventType.NOCOMM:
severity = Severity.CRITICAL
reasons_for_severity.add("Cannot establish communication")

View file

@ -1,5 +1,5 @@
from dataclasses import dataclass
from enum import StrEnum, auto
from enum import StrEnum
class UPSEventType(StrEnum):
@ -12,6 +12,7 @@ class UPSEventType(StrEnum):
COMMOK = "COMMOK"
COMMBAD = "COMMBAD"
SHUTDOWN = "SHUTDOWN"
SHUTDOWN_HOSTSYNC = "SHUTDOWN_HOSTSYNC"
REPLBATT = "REPLBATT"
NOCOMM = "NOCOMM"
NOPARENT = "NOPARENT"

View file

@ -0,0 +1,27 @@
import json
import os
import sys
from dataclasses import asdict
from lego_monitoring.core.const import UPS_PIPE_NAME
from lego_monitoring.core.fifo import pipe_exists
from .events import UPSEvent, UPSEventType
def write_ups_status():
if not pipe_exists(UPS_PIPE_NAME):
raise Exception("lego-monitoring not running!")
notifytype = os.environ["NOTIFYTYPE"]
if notifytype not in UPSEventType:
notifytype = UPSEventType.OTHER
upsname = os.environ["UPSNAME"]
message = sys.argv[1]
event = UPSEvent(type_=notifytype, message=message, ups_name=upsname)
event_s = json.dumps(asdict(event)) + "\n"
with open(UPS_PIPE_NAME, "a") as p:
p.write(event_s)

View file

@ -4,4 +4,5 @@ from typing import Optional
@dataclass
class UPSCheckConfig:
upsmon_group: str = "nutmon"
ups_to_check: Optional[list] = None

View file

@ -3,6 +3,7 @@ import datetime
import json
import logging
import os
import shutil
from dataclasses import KW_ONLY, dataclass, field
from typing import Any, Callable, Coroutine, Optional
@ -151,6 +152,8 @@ class PipeIntervalChecker(IntervalChecker):
_: KW_ONLY
pipe: str
owner_user: Optional[str] = None
owner_group: Optional[str] = None
read_task: Optional[asyncio.Task] = None
async def _read_status(self) -> list:
@ -162,11 +165,15 @@ class PipeIntervalChecker(IntervalChecker):
async def run_checker(self) -> None:
interval_secs = self.interval.total_seconds()
ignore_first_run = self.ignore_first_run
try:
os.remove(self.pipe)
except FileNotFoundError:
pass
os.mkfifo(self.pipe)
if self.owner_user is not None or self.owner_group is not None:
shutil.chown(self.pipe, self.owner_user, self.owner_group)
os.chmod(self.pipe, 0o660)
while True:
logging.info(f"Waiting on pipe {self.pipe}")

View file

@ -0,0 +1,11 @@
import os
import stat
def pipe_exists(path: str) -> bool:
try:
if stat.S_ISFIFO(os.stat(path).st_mode) == 0:
return False
return True
except FileNotFoundError:
return False

158
src/lego_monitoring/main.py Normal file
View file

@ -0,0 +1,158 @@
import argparse
import asyncio
import datetime
import logging
import signal
from typing import Coroutine
from . import checks
from .alerting import sender
from .alerting.commands import CommandHandlerManager
from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import (
BaseChecker,
IntervalChecker,
PipeIntervalChecker,
ScheduledChecker,
)
from .core.const import UPS_PIPE_NAME
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
def main() -> None:
asyncio.run(async_main())
async def async_main():
parser = argparse.ArgumentParser(
prog="lego-monitoring",
description="Lego-monitoring service",
)
parser.add_argument("-c", "--config", help="config file")
parser.add_argument("--print-temp", help="print temp sensor readings and exit", action="store_true")
args = parser.parse_args()
if args.config:
config_path = parser.parse_args().config
config = load_config(config_path)
cvars.config.set(config)
if args.print_temp:
print_readings()
raise SystemExit
if not args.config:
raise RuntimeError("--config must be specified in standard operating mode")
logging.basicConfig(level=config.log_level)
check_sets = config_enums.CheckSet
checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = {
check_sets.SELF: [
sender.send_alert(checks.generate_start_alert()),
IntervalChecker(checks.self_check, interval=datetime.timedelta(minutes=5), persistent=False),
],
check_sets.CPU: [
IntervalChecker(
checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True, ignore_first_run=True
)
],
check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)],
check_sets.TEMP: [IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True)],
check_sets.VULNIX: [
IntervalChecker(
checks.vulnix_check,
interval=datetime.timedelta(days=3),
persistent=True,
send_any_state=True,
# As those are checked less often than daily, reminds could lead to awkward situations
# when the vuln is fixed but you still get reminders about it for 2 more days.
remind=False,
)
],
check_sets.REMIND: [
ScheduledChecker(
checks.remind_check,
period=datetime.timedelta(days=1),
when=datetime.time(hour=0, minute=0),
persistent=False,
is_reminder=True,
)
],
check_sets.NET: [
IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True)
],
check_sets.UPS: [
PipeIntervalChecker(
checks.UPSTracker().ups_check,
interval=datetime.timedelta(minutes=5),
persistent=True,
pipe=UPS_PIPE_NAME,
owner_group=config.checks.ups.upsmon_group,
)
],
}
checkers = []
for enabled_set in config.enabled_check_sets:
for checker in checker_sets[enabled_set]:
checkers.append(checker)
checker_sets[check_sets.REMIND][0].check_args = [checkers]
if config.alert_channels.telegram is not None:
tg_client = await sender.get_tg_client()
my_username = (await tg_client.get_me()).username
logging.info(f"Logged in as @{my_username}")
command_manager = CommandHandlerManager(checkers)
await command_manager.attach_handlers(tg_client)
else:
logging.info("Telegram integration is disabled")
tg_client = None
cvars.tg_client.set(tg_client)
if config.alert_channels.healthchecks is not None:
healthchecks_client = sender.get_healthchecks_client()
logging.info("Ready to send pings to healthchecks")
cvars.healthchecks_client.set(healthchecks_client)
else:
logging.info("Healthchecks integration is disabled")
signal.signal(signal.SIGTERM, stop_gracefully)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
if isinstance(c, BaseChecker):
c = c.run_checker()
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
if "self" in config.enabled_check_sets:
alert = checks.generate_stop_alert()
async with asyncio.TaskGroup() as tg:
tg.create_task(sender.send_alert(alert))
tg.create_task(sender.send_healthchecks_status(alert))
for c in checkers:
try:
await c.graceful_stop()
except AttributeError:
continue
if tg_client:
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)