From eef6ec59b0baf808ca41a078cc0eecaa4eb8aefa Mon Sep 17 00:00:00 2001 From: Alex Tau Date: Thu, 5 Jun 2025 21:45:01 +0300 Subject: [PATCH] checkers are now objects, lay foundation for persistent alerts --- modules/default.nix | 9 +- src/lego_monitoring/__init__.py | 28 +++-- src/lego_monitoring/alerting/alert.py | 10 ++ .../alerting/{alerts.py => channel.py} | 10 +- src/lego_monitoring/alerting/current.py | 26 ++++ src/lego_monitoring/alerting/enum.py | 11 +- src/lego_monitoring/checks/cpu.py | 8 +- src/lego_monitoring/checks/ram.py | 8 +- src/lego_monitoring/checks/temp/__init__.py | 8 +- src/lego_monitoring/checks/vulnix/__init__.py | 11 +- src/lego_monitoring/core/checkers.py | 112 +++++++++++------- src/lego_monitoring/core/cvars.py | 2 + 12 files changed, 162 insertions(+), 81 deletions(-) create mode 100644 src/lego_monitoring/alerting/alert.py rename src/lego_monitoring/alerting/{alerts.py => channel.py} (95%) create mode 100644 src/lego_monitoring/alerting/current.py diff --git a/modules/default.nix b/modules/default.nix index 28d1203..1cff5ad 100644 --- a/modules/default.nix +++ b/modules/default.nix @@ -68,7 +68,14 @@ package: description = "Lego-monitoring service"; script = "${package}/bin/lego-monitoring -c ${serviceConfigFile}"; wantedBy = [ "multi-user.target" ]; - serviceConfig.Restart = "on-failure"; + serviceConfig = { + Restart = "on-failure"; + RestartSec = "5"; + }; + unitConfig = { + StartLimitIntervalSec = 20; + StartLimitBurst = 3; + }; }; }; } diff --git a/src/lego_monitoring/__init__.py b/src/lego_monitoring/__init__.py index 987c225..1971316 100644 --- a/src/lego_monitoring/__init__.py +++ b/src/lego_monitoring/__init__.py @@ -5,12 +5,12 @@ import logging import signal from . import checks -from .alerting import alerts +from .alerting import channel from .checks.temp.sensors import print_readings from .config import enums as config_enums from .config import load_config from .core import cvars -from .core.checkers import interval_checker +from .core.checkers import IntervalChecker stopping = False @@ -47,18 +47,28 @@ async def async_main(): logging.basicConfig(level=config.log_level) - tg_client = await alerts.get_client() + tg_client = await channel.get_client() cvars.tg_client.set(tg_client) check_sets = config_enums.CheckSet checker_sets = { - check_sets.START: [alerts.send_start_alert()], + check_sets.START: [channel.send_start_alert()], check_sets.STOP: [], # this is checked later - check_sets.CPU: [interval_checker(checks.cpu_check, datetime.timedelta(minutes=5))], - check_sets.RAM: [interval_checker(checks.ram_check, datetime.timedelta(minutes=1))], - check_sets.TEMP: [interval_checker(checks.temp_check, datetime.timedelta(minutes=5))], - check_sets.VULNIX: [interval_checker(checks.vulnix_check, datetime.timedelta(days=3))], + check_sets.CPU: [ + IntervalChecker(checks.cpu_check, interval=datetime.timedelta(minutes=5), persistent=True).run_checker() + ], + check_sets.RAM: [ + IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True).run_checker() + ], + check_sets.TEMP: [ + IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True).run_checker() + ], + check_sets.VULNIX: [ + IntervalChecker( + checks.vulnix_check, interval=datetime.timedelta(days=3), persistent=True, send_same_state=True + ).run_checker() + ], } checkers = [] @@ -76,7 +86,7 @@ async def async_main(): while True: if stopping: if "stop" in config.enabled_check_sets: - await alerts.send_stop_alert() + await channel.send_stop_alert() await tg_client.disconnect() raise SystemExit else: diff --git a/src/lego_monitoring/alerting/alert.py b/src/lego_monitoring/alerting/alert.py new file mode 100644 index 0000000..a593d60 --- /dev/null +++ b/src/lego_monitoring/alerting/alert.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + +from .enum import AlertType, Severity + + +@dataclass +class Alert: + alert_type: AlertType + message: str + severity: Severity diff --git a/src/lego_monitoring/alerting/alerts.py b/src/lego_monitoring/alerting/channel.py similarity index 95% rename from src/lego_monitoring/alerting/alerts.py rename to src/lego_monitoring/alerting/channel.py index dfa61a4..09a72d7 100644 --- a/src/lego_monitoring/alerting/alerts.py +++ b/src/lego_monitoring/alerting/channel.py @@ -4,16 +4,10 @@ from telethon import TelegramClient from telethon.sessions import MemorySession from ..core import cvars +from .alert import Alert from .enum import AlertType, Severity -@dataclass -class Alert: - alert_type: AlertType - message: str - severity: Severity - - async def get_client() -> TelegramClient: config = cvars.config.get() api_id, api_hash, bot_token = config.telegram.creds.split(",") @@ -24,6 +18,8 @@ async def get_client() -> TelegramClient: def format_message(alert: Alert) -> str: match alert.severity: + case Severity.OK: + severity_emoji = "🟢" case Severity.INFO: severity_emoji = "ℹ️" case Severity.WARNING: diff --git a/src/lego_monitoring/alerting/current.py b/src/lego_monitoring/alerting/current.py new file mode 100644 index 0000000..dae575e --- /dev/null +++ b/src/lego_monitoring/alerting/current.py @@ -0,0 +1,26 @@ +from typing import Optional + +from .alert import Alert +from .enum import AlertType, Severity + + +class CurrentAlerts(list[Alert]): + def get_severity(self) -> Optional[Severity]: + max_severity = None + for a in self: + if max_severity is None or a.severity > max_severity: + max_severity = a.severity + return a.severity + + def get_types(self) -> set[AlertType]: + types = set() + for a in self: + types.add(a.alert_type) + return types + + def update(self, alerts: list[Alert]) -> tuple[Optional[Severity], Optional[Severity]]: + old_severity = self.get_severity() + self.clear() + self.extend(alerts) + new_severity = self.get_severity() + return (old_severity, new_severity) diff --git a/src/lego_monitoring/alerting/enum.py b/src/lego_monitoring/alerting/enum.py index d3b6a18..8ce5164 100644 --- a/src/lego_monitoring/alerting/enum.py +++ b/src/lego_monitoring/alerting/enum.py @@ -1,4 +1,4 @@ -from enum import StrEnum +from enum import IntEnum, StrEnum class AlertType(StrEnum): @@ -17,7 +17,8 @@ class AlertType(StrEnum): # UPDATE = "UPDATE" -class Severity(StrEnum): - INFO = "INFO" - WARNING = "WARNING" - CRITICAL = "CRITICAL" +class Severity(IntEnum): + OK = 0 # should only be used when persistent alerts resolve + INFO = 1 + WARNING = 2 + CRITICAL = 3 diff --git a/src/lego_monitoring/checks/cpu.py b/src/lego_monitoring/checks/cpu.py index de46820..ae335e9 100644 --- a/src/lego_monitoring/checks/cpu.py +++ b/src/lego_monitoring/checks/cpu.py @@ -1,18 +1,18 @@ from psutil import cpu_percent -from lego_monitoring.alerting import alerts +from lego_monitoring.alerting.alert import Alert from lego_monitoring.alerting.enum import AlertType, Severity from lego_monitoring.core import cvars IS_TESTING = False -def cpu_check() -> list[alerts.Alert]: +def cpu_check() -> list[Alert]: percentage = cpu_percent() config = cvars.config.get().checks.cpu if config.critical_percentage and (IS_TESTING or percentage > config.critical_percentage): return [ - alerts.Alert( + Alert( alert_type=AlertType.CPU, message=f"CPU load: {percentage:.2f}% > {config.critical_percentage:.2f}%", severity=Severity.CRITICAL, @@ -20,7 +20,7 @@ def cpu_check() -> list[alerts.Alert]: ] elif config.warning_percentage and (IS_TESTING or percentage > config.warning_percentage): return [ - alerts.Alert( + Alert( alert_type=AlertType.CPU, message=f"CPU load: {percentage:.2f}% > {config.warning_percentage:.2f}%", severity=Severity.WARNING, diff --git a/src/lego_monitoring/checks/ram.py b/src/lego_monitoring/checks/ram.py index 334465f..eff87f7 100644 --- a/src/lego_monitoring/checks/ram.py +++ b/src/lego_monitoring/checks/ram.py @@ -1,18 +1,18 @@ from psutil import virtual_memory -from lego_monitoring.alerting import alerts +from lego_monitoring.alerting.alert import Alert from lego_monitoring.alerting.enum import AlertType, Severity from lego_monitoring.core import cvars IS_TESTING = False -def ram_check() -> list[alerts.Alert]: +def ram_check() -> list[Alert]: percentage = virtual_memory().percent config = cvars.config.get().checks.ram if config.critical_percentage and (IS_TESTING or percentage > config.critical_percentage): return [ - alerts.Alert( + Alert( alert_type=AlertType.RAM, message=f"RAM usage: {percentage:.2f}% > {config.critical_percentage:.2f}%", severity=Severity.CRITICAL, @@ -20,7 +20,7 @@ def ram_check() -> list[alerts.Alert]: ] elif config.warning_percentage and (IS_TESTING or percentage > config.warning_percentage): return [ - alerts.Alert( + Alert( alert_type=AlertType.RAM, message=f"RAM usage: {percentage:.2f}% > {config.warning_percentage:.2f}%", severity=Severity.WARNING, diff --git a/src/lego_monitoring/checks/temp/__init__.py b/src/lego_monitoring/checks/temp/__init__.py index 4f965dc..9c68a3b 100644 --- a/src/lego_monitoring/checks/temp/__init__.py +++ b/src/lego_monitoring/checks/temp/__init__.py @@ -1,4 +1,4 @@ -from lego_monitoring.alerting import alerts +from lego_monitoring.alerting.alert import Alert from lego_monitoring.alerting.enum import AlertType, Severity from . import sensors @@ -6,19 +6,19 @@ from . import sensors IS_TESTING = False -def temp_check() -> list[alerts.Alert]: +def temp_check() -> list[Alert]: alert_list = [] temps = sensors.get_readings() for sensor, readings in temps.items(): for r in readings: if r.critical_temp is not None and (IS_TESTING or r.current_temp > r.critical_temp): - alert = alerts.Alert( + alert = Alert( alert_type=AlertType.TEMP, message=f"{sensor} {r.label}: {r.current_temp}°C > {r.critical_temp}°C", severity=Severity.CRITICAL, ) elif r.warning_temp is not None and (IS_TESTING or r.current_temp > r.warning_temp): - alert = alerts.Alert( + alert = Alert( alert_type=AlertType.TEMP, message=f"{sensor} {r.label}: {r.current_temp}°C > {r.warning_temp}°C", severity=Severity.WARNING, diff --git a/src/lego_monitoring/checks/vulnix/__init__.py b/src/lego_monitoring/checks/vulnix/__init__.py index c9f7a10..302bdc4 100644 --- a/src/lego_monitoring/checks/vulnix/__init__.py +++ b/src/lego_monitoring/checks/vulnix/__init__.py @@ -1,4 +1,5 @@ -from lego_monitoring.alerting import alerts +from lego_monitoring.alerting.alert import Alert +from lego_monitoring.alerting.channel import send_alert from lego_monitoring.alerting.enum import AlertType, Severity from .vulnix import get_vulnix_output @@ -6,13 +7,13 @@ from .vulnix import get_vulnix_output IS_TESTING = False -def vulnix_check() -> list[alerts.Alert]: +def vulnix_check() -> list[Alert]: alert_list = [] try: vulnix_output = get_vulnix_output(IS_TESTING) except Exception as e: - alerts.send_alert( - alerts.Alert( + send_alert( + Alert( alert_type=AlertType.ERROR, message=f"Exception {type(e).__name__} while calling vulnix: {e}", severity=Severity.CRITICAL, @@ -34,7 +35,7 @@ def vulnix_check() -> list[alerts.Alert]: score_str = "(not scored by CVSSv3)" message += f'\n* {cve} - {finding.description[cve]} {score_str}' - alert = alerts.Alert( + alert = Alert( alert_type=AlertType.VULN, message=message, severity=Severity.WARNING, diff --git a/src/lego_monitoring/core/checkers.py b/src/lego_monitoring/core/checkers.py index 12db2b8..5c2c97d 100644 --- a/src/lego_monitoring/core/checkers.py +++ b/src/lego_monitoring/core/checkers.py @@ -1,53 +1,81 @@ import asyncio import datetime import logging -from typing import Callable, Coroutine +from dataclasses import KW_ONLY, dataclass, field +from typing import Any, Callable, Coroutine -from ..alerting import alerts +from ..alerting.alert import Alert +from ..alerting.channel import send_alert -async def _call_check(check: Callable | Coroutine, *args, **kwargs) -> list[alerts.Alert]: - if isinstance(check, Callable): - result = check(*args, **kwargs) - if isinstance(result, Coroutine): - result = await result - elif isinstance(check, Coroutine): - result = await check - else: - raise TypeError(f"check is {type(check)}, neither function nor coroutine") - return result +@dataclass +class BaseChecker: + check: Callable | Coroutine + persistent: bool + send_same_state: bool = False + check_args: list = field(default_factory=list) + check_kwargs: dict[str, Any] = field(default_factory=dict) + + async def _call_check(self) -> list[Alert]: + if isinstance(self.check, Callable): + result = self.check(*self.check_args, **self.check_kwargs) + if isinstance(result, Coroutine): + result = await result + elif isinstance(self.check, Coroutine): + result = await self.check + else: + raise TypeError(f"check is {type(self.check)}, neither function nor coroutine") + return result + + async def _handle_alert(alert: Alert, persistent: bool, send_same_state: bool) -> None: + if not persistent: + await send_alert(alert) + return + ... + + async def run_checker(self): + raise NotImplementedError -async def interval_checker(check: Callable | Coroutine, interval: datetime.timedelta, *args, **kwargs): - interval_secs = interval.total_seconds() - while True: - logging.info(f"Calling {check.__name__}") - result = await _call_check(check, *args, **kwargs) - logging.info(f"Got {len(result)} alerts") - for alert in result: - await alerts.send_alert(alert) - await asyncio.sleep(interval_secs) +@dataclass +class IntervalChecker(BaseChecker): + _: KW_ONLY + interval: datetime.timedelta + + async def run_checker(self): + interval_secs = self.interval.total_seconds() + while True: + logging.info(f"Calling {self.check.__name__}") + result = await self._call_check() + logging.info(f"Got {len(result)} alerts") + for alert in result: + await send_alert(alert) + await asyncio.sleep(interval_secs) -async def scheduled_checker( - check: Callable | Coroutine, period: datetime.timedelta, when: datetime.time, *args, **kwargs -): - match period: - case datetime.timedelta(days=1): - while True: - now = datetime.datetime.now() - next_datetime = datetime.datetime.combine(datetime.date.today(), when) - if next_datetime < now: - next_datetime += datetime.timedelta(days=1) - logging.info(f"Scheduled to call {check.__name__} at {next_datetime.isoformat()}") - await asyncio.sleep( - (next_datetime - now).total_seconds() - ) # might be negative at this point, asyncio doesn't care +@dataclass +class ScheduledChecker(BaseChecker): + _: KW_ONLY + period: datetime.timedelta + when: datetime.time - logging.info(f"Calling {check.__name__}") - result = await _call_check(check, *args, **kwargs) - logging.info(f"Got {len(result)} alerts") - for alert in result: - await alerts.send_alert(alert) - case _: - raise NotImplementedError + async def run_checker(self): + match self.period: + case datetime.timedelta(days=1): + while True: + now = datetime.datetime.now() + next_datetime = datetime.datetime.combine(datetime.date.today(), self.when) + if next_datetime < now: + next_datetime += datetime.timedelta(days=1) + logging.info(f"Scheduled to call {self.check.__name__} at {next_datetime.isoformat()}") + await asyncio.sleep( + (next_datetime - now).total_seconds() + ) # might be negative at this point, asyncio doesn't care + + logging.info(f"Calling {self.check.__name__}") + result = await self._call_check() + logging.info(f"Got {len(result)} alerts") + for alert in result: + await send_alert(alert) + case _: + raise NotImplementedError diff --git a/src/lego_monitoring/core/cvars.py b/src/lego_monitoring/core/cvars.py index a4781c5..78dff36 100644 --- a/src/lego_monitoring/core/cvars.py +++ b/src/lego_monitoring/core/cvars.py @@ -2,7 +2,9 @@ from contextvars import ContextVar from telethon import TelegramClient +from ..alerting.current import CurrentAlerts from ..config import Config config: ContextVar[Config] = ContextVar("config") tg_client: ContextVar[TelegramClient] = ContextVar("tg_client") +current_alerts: ContextVar[list[CurrentAlerts]] = ContextVar("current_alerts", default=[])