checkers are now objects, lay foundation for persistent alerts

This commit is contained in:
Alex Tau 2025-06-05 21:45:01 +03:00
parent 5d2759c63c
commit eef6ec59b0
12 changed files with 162 additions and 81 deletions

View file

@ -5,12 +5,12 @@ import logging
import signal
from . import checks
from .alerting import alerts
from .alerting import channel
from .checks.temp.sensors import print_readings
from .config import enums as config_enums
from .config import load_config
from .core import cvars
from .core.checkers import interval_checker
from .core.checkers import IntervalChecker
stopping = False
@ -47,18 +47,28 @@ async def async_main():
logging.basicConfig(level=config.log_level)
tg_client = await alerts.get_client()
tg_client = await channel.get_client()
cvars.tg_client.set(tg_client)
check_sets = config_enums.CheckSet
checker_sets = {
check_sets.START: [alerts.send_start_alert()],
check_sets.START: [channel.send_start_alert()],
check_sets.STOP: [], # this is checked later
check_sets.CPU: [interval_checker(checks.cpu_check, datetime.timedelta(minutes=5))],
check_sets.RAM: [interval_checker(checks.ram_check, datetime.timedelta(minutes=1))],
check_sets.TEMP: [interval_checker(checks.temp_check, datetime.timedelta(minutes=5))],
check_sets.VULNIX: [interval_checker(checks.vulnix_check, datetime.timedelta(days=3))],
check_sets.CPU: [
IntervalChecker(checks.cpu_check, interval=datetime.timedelta(minutes=5), persistent=True).run_checker()
],
check_sets.RAM: [
IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True).run_checker()
],
check_sets.TEMP: [
IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True).run_checker()
],
check_sets.VULNIX: [
IntervalChecker(
checks.vulnix_check, interval=datetime.timedelta(days=3), persistent=True, send_same_state=True
).run_checker()
],
}
checkers = []
@ -76,7 +86,7 @@ async def async_main():
while True:
if stopping:
if "stop" in config.enabled_check_sets:
await alerts.send_stop_alert()
await channel.send_stop_alert()
await tg_client.disconnect()
raise SystemExit
else:

View file

@ -0,0 +1,10 @@
from dataclasses import dataclass
from .enum import AlertType, Severity
@dataclass
class Alert:
alert_type: AlertType
message: str
severity: Severity

View file

@ -4,16 +4,10 @@ from telethon import TelegramClient
from telethon.sessions import MemorySession
from ..core import cvars
from .alert import Alert
from .enum import AlertType, Severity
@dataclass
class Alert:
alert_type: AlertType
message: str
severity: Severity
async def get_client() -> TelegramClient:
config = cvars.config.get()
api_id, api_hash, bot_token = config.telegram.creds.split(",")
@ -24,6 +18,8 @@ async def get_client() -> TelegramClient:
def format_message(alert: Alert) -> str:
match alert.severity:
case Severity.OK:
severity_emoji = "🟢"
case Severity.INFO:
severity_emoji = ""
case Severity.WARNING:

View file

@ -0,0 +1,26 @@
from typing import Optional
from .alert import Alert
from .enum import AlertType, Severity
class CurrentAlerts(list[Alert]):
def get_severity(self) -> Optional[Severity]:
max_severity = None
for a in self:
if max_severity is None or a.severity > max_severity:
max_severity = a.severity
return a.severity
def get_types(self) -> set[AlertType]:
types = set()
for a in self:
types.add(a.alert_type)
return types
def update(self, alerts: list[Alert]) -> tuple[Optional[Severity], Optional[Severity]]:
old_severity = self.get_severity()
self.clear()
self.extend(alerts)
new_severity = self.get_severity()
return (old_severity, new_severity)

View file

@ -1,4 +1,4 @@
from enum import StrEnum
from enum import IntEnum, StrEnum
class AlertType(StrEnum):
@ -17,7 +17,8 @@ class AlertType(StrEnum):
# UPDATE = "UPDATE"
class Severity(StrEnum):
INFO = "INFO"
WARNING = "WARNING"
CRITICAL = "CRITICAL"
class Severity(IntEnum):
OK = 0 # should only be used when persistent alerts resolve
INFO = 1
WARNING = 2
CRITICAL = 3

View file

@ -1,18 +1,18 @@
from psutil import cpu_percent
from lego_monitoring.alerting import alerts
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
IS_TESTING = False
def cpu_check() -> list[alerts.Alert]:
def cpu_check() -> list[Alert]:
percentage = cpu_percent()
config = cvars.config.get().checks.cpu
if config.critical_percentage and (IS_TESTING or percentage > config.critical_percentage):
return [
alerts.Alert(
Alert(
alert_type=AlertType.CPU,
message=f"CPU load: {percentage:.2f}% > {config.critical_percentage:.2f}%",
severity=Severity.CRITICAL,
@ -20,7 +20,7 @@ def cpu_check() -> list[alerts.Alert]:
]
elif config.warning_percentage and (IS_TESTING or percentage > config.warning_percentage):
return [
alerts.Alert(
Alert(
alert_type=AlertType.CPU,
message=f"CPU load: {percentage:.2f}% > {config.warning_percentage:.2f}%",
severity=Severity.WARNING,

View file

@ -1,18 +1,18 @@
from psutil import virtual_memory
from lego_monitoring.alerting import alerts
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
IS_TESTING = False
def ram_check() -> list[alerts.Alert]:
def ram_check() -> list[Alert]:
percentage = virtual_memory().percent
config = cvars.config.get().checks.ram
if config.critical_percentage and (IS_TESTING or percentage > config.critical_percentage):
return [
alerts.Alert(
Alert(
alert_type=AlertType.RAM,
message=f"RAM usage: {percentage:.2f}% > {config.critical_percentage:.2f}%",
severity=Severity.CRITICAL,
@ -20,7 +20,7 @@ def ram_check() -> list[alerts.Alert]:
]
elif config.warning_percentage and (IS_TESTING or percentage > config.warning_percentage):
return [
alerts.Alert(
Alert(
alert_type=AlertType.RAM,
message=f"RAM usage: {percentage:.2f}% > {config.warning_percentage:.2f}%",
severity=Severity.WARNING,

View file

@ -1,4 +1,4 @@
from lego_monitoring.alerting import alerts
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from . import sensors
@ -6,19 +6,19 @@ from . import sensors
IS_TESTING = False
def temp_check() -> list[alerts.Alert]:
def temp_check() -> list[Alert]:
alert_list = []
temps = sensors.get_readings()
for sensor, readings in temps.items():
for r in readings:
if r.critical_temp is not None and (IS_TESTING or r.current_temp > r.critical_temp):
alert = alerts.Alert(
alert = Alert(
alert_type=AlertType.TEMP,
message=f"{sensor} {r.label}: {r.current_temp}°C > {r.critical_temp}°C",
severity=Severity.CRITICAL,
)
elif r.warning_temp is not None and (IS_TESTING or r.current_temp > r.warning_temp):
alert = alerts.Alert(
alert = Alert(
alert_type=AlertType.TEMP,
message=f"{sensor} {r.label}: {r.current_temp}°C > {r.warning_temp}°C",
severity=Severity.WARNING,

View file

@ -1,4 +1,5 @@
from lego_monitoring.alerting import alerts
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.channel import send_alert
from lego_monitoring.alerting.enum import AlertType, Severity
from .vulnix import get_vulnix_output
@ -6,13 +7,13 @@ from .vulnix import get_vulnix_output
IS_TESTING = False
def vulnix_check() -> list[alerts.Alert]:
def vulnix_check() -> list[Alert]:
alert_list = []
try:
vulnix_output = get_vulnix_output(IS_TESTING)
except Exception as e:
alerts.send_alert(
alerts.Alert(
send_alert(
Alert(
alert_type=AlertType.ERROR,
message=f"Exception {type(e).__name__} while calling vulnix: {e}",
severity=Severity.CRITICAL,
@ -34,7 +35,7 @@ def vulnix_check() -> list[alerts.Alert]:
score_str = "(not scored by CVSSv3)"
message += f'\n* <a href="https://nvd.nist.gov/vuln/detail/{cve}">{cve}</a> - {finding.description[cve]} {score_str}'
alert = alerts.Alert(
alert = Alert(
alert_type=AlertType.VULN,
message=message,
severity=Severity.WARNING,

View file

@ -1,53 +1,81 @@
import asyncio
import datetime
import logging
from typing import Callable, Coroutine
from dataclasses import KW_ONLY, dataclass, field
from typing import Any, Callable, Coroutine
from ..alerting import alerts
from ..alerting.alert import Alert
from ..alerting.channel import send_alert
async def _call_check(check: Callable | Coroutine, *args, **kwargs) -> list[alerts.Alert]:
if isinstance(check, Callable):
result = check(*args, **kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(check, Coroutine):
result = await check
else:
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
return result
@dataclass
class BaseChecker:
check: Callable | Coroutine
persistent: bool
send_same_state: bool = False
check_args: list = field(default_factory=list)
check_kwargs: dict[str, Any] = field(default_factory=dict)
async def _call_check(self) -> list[Alert]:
if isinstance(self.check, Callable):
result = self.check(*self.check_args, **self.check_kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(self.check, Coroutine):
result = await self.check
else:
raise TypeError(f"check is {type(self.check)}, neither function nor coroutine")
return result
async def _handle_alert(alert: Alert, persistent: bool, send_same_state: bool) -> None:
if not persistent:
await send_alert(alert)
return
...
async def run_checker(self):
raise NotImplementedError
async def interval_checker(check: Callable | Coroutine, interval: datetime.timedelta, *args, **kwargs):
interval_secs = interval.total_seconds()
while True:
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
await asyncio.sleep(interval_secs)
@dataclass
class IntervalChecker(BaseChecker):
_: KW_ONLY
interval: datetime.timedelta
async def run_checker(self):
interval_secs = self.interval.total_seconds()
while True:
logging.info(f"Calling {self.check.__name__}")
result = await self._call_check()
logging.info(f"Got {len(result)} alerts")
for alert in result:
await send_alert(alert)
await asyncio.sleep(interval_secs)
async def scheduled_checker(
check: Callable | Coroutine, period: datetime.timedelta, when: datetime.time, *args, **kwargs
):
match period:
case datetime.timedelta(days=1):
while True:
now = datetime.datetime.now()
next_datetime = datetime.datetime.combine(datetime.date.today(), when)
if next_datetime < now:
next_datetime += datetime.timedelta(days=1)
logging.info(f"Scheduled to call {check.__name__} at {next_datetime.isoformat()}")
await asyncio.sleep(
(next_datetime - now).total_seconds()
) # might be negative at this point, asyncio doesn't care
@dataclass
class ScheduledChecker(BaseChecker):
_: KW_ONLY
period: datetime.timedelta
when: datetime.time
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
case _:
raise NotImplementedError
async def run_checker(self):
match self.period:
case datetime.timedelta(days=1):
while True:
now = datetime.datetime.now()
next_datetime = datetime.datetime.combine(datetime.date.today(), self.when)
if next_datetime < now:
next_datetime += datetime.timedelta(days=1)
logging.info(f"Scheduled to call {self.check.__name__} at {next_datetime.isoformat()}")
await asyncio.sleep(
(next_datetime - now).total_seconds()
) # might be negative at this point, asyncio doesn't care
logging.info(f"Calling {self.check.__name__}")
result = await self._call_check()
logging.info(f"Got {len(result)} alerts")
for alert in result:
await send_alert(alert)
case _:
raise NotImplementedError

View file

@ -2,7 +2,9 @@ from contextvars import ContextVar
from telethon import TelegramClient
from ..alerting.current import CurrentAlerts
from ..config import Config
config: ContextVar[Config] = ContextVar("config")
tg_client: ContextVar[TelegramClient] = ContextVar("tg_client")
current_alerts: ContextVar[list[CurrentAlerts]] = ContextVar("current_alerts", default=[])