diff --git a/src/lego_monitoring/__init__.py b/src/lego_monitoring/__init__.py index 389e785..b070580 100644 --- a/src/lego_monitoring/__init__.py +++ b/src/lego_monitoring/__init__.py @@ -6,7 +6,8 @@ import signal from typing import Coroutine from . import checks -from .alerting import channel +from .alerting import sender +from .alerting.commands import CommandHandlerManager from .checks.temp.sensors import print_readings from .config import enums as config_enums from .config import load_config @@ -48,13 +49,15 @@ async def async_main(): logging.basicConfig(level=config.log_level) - tg_client = await channel.get_client() - cvars.tg_client.set(tg_client) + tg_client = await sender.get_client() + cvars.tg_client.set(tg_client) + my_username = (await tg_client.get_me()).username + logging.info(f"Logged in as @{my_username}") check_sets = config_enums.CheckSet checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = { - check_sets.START: [channel.send_start_alert()], + check_sets.START: [sender.send_start_alert()], check_sets.STOP: [], # this is checked later check_sets.CPU: [IntervalChecker(checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True)], check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)], @@ -76,17 +79,21 @@ async def async_main(): period=datetime.timedelta(days=1), when=datetime.time(hour=0, minute=0), persistent=False, + is_reminder=True, ) ], } - checker_sets[check_sets.REMIND][0].check_args = [checker_sets] - checkers = [] for enabled_set in config.enabled_check_sets: for checker in checker_sets[enabled_set]: checkers.append(checker) + checker_sets[check_sets.REMIND][0].check_args = checkers + + command_manager = CommandHandlerManager(checkers) + await command_manager.attach_handlers(tg_client) + signal.signal(signal.SIGTERM, stop_gracefully) async with asyncio.TaskGroup() as tg: @@ -99,7 +106,7 @@ async def async_main(): while True: if stopping: if "stop" in config.enabled_check_sets: - await channel.send_stop_alert() + await sender.send_stop_alert() await tg_client.disconnect() raise SystemExit else: diff --git a/src/lego_monitoring/alerting/commands.py b/src/lego_monitoring/alerting/commands.py new file mode 100644 index 0000000..a07bb34 --- /dev/null +++ b/src/lego_monitoring/alerting/commands.py @@ -0,0 +1,88 @@ +from dataclasses import dataclass +from typing import Awaitable, Callable + +from telethon import TelegramClient, events, functions, types + +from lego_monitoring.core import cvars +from lego_monitoring.core.checkers import BaseChecker + +from .enum import SEVERITY_TO_EMOJI, AlertType, Severity +from .sender import format_message + + +def admin_chat_only( + handler: Callable[[events.NewMessage.Event], Awaitable[None]], +) -> Callable[[events.NewMessage.Event], Awaitable[None]]: + admin_room_id = cvars.config.get().telegram.room_id + + async def safe_handler(event: events.NewMessage.Event) -> None: + if event.chat_id == admin_room_id: + return await handler(event) + + return safe_handler + + +@dataclass +class CommandHandlerManager: + checkers: list[BaseChecker] + + async def attach_handlers(self, tg_client: TelegramClient): + my_username = (await tg_client.get_me()).username + + @tg_client.on(events.NewMessage(pattern=f"/status(?:@{my_username})?")) + @admin_chat_only + async def status(event: events.NewMessage.Event): + return await self.status_handler(event) + + @tg_client.on(events.NewMessage(pattern=f"/ongoing(?:@{my_username})?")) + @admin_chat_only + async def status(event: events.NewMessage.Event): + return await self.ongoing_handler(event) + + await tg_client( + functions.bots.SetBotCommandsRequest( + scope=types.BotCommandScopeDefault(), + lang_code="en", + commands=[ + types.BotCommand(command="status", description="Get current system status"), + types.BotCommand(command="ongoing", description="Show ongoing alerts"), + ], + ) + ) + + async def status_handler(self, event: events.NewMessage.Event): + alert_num_by_state_with_max_type: dict[AlertType, list[Severity | int]] = {} + for c in self.checkers: + if not isinstance(c, BaseChecker) or not c.persistent: + continue + for a in c.current_alerts: + if a.alert_type not in alert_num_by_state_with_max_type: + alert_num_by_state_with_max_type[a.alert_type] = [a.severity, 1] + else: + existing_list = alert_num_by_state_with_max_type[a.alert_type] + if a.severity > existing_list[0]: + existing_list[0] = a.severity + existing_list[1] += 1 + + if len(alert_num_by_state_with_max_type) == 0: + message = "🟢 There are no ongoing events." + else: + message = "There are ongoing events:" + for at, sev_count in alert_num_by_state_with_max_type.items(): + message += f"\n* {SEVERITY_TO_EMOJI[sev_count[0]]} {str(at)} - {sev_count[1]} alerts" + message += "\n\nUse /ongoing to see them." + + await event.respond(message) + + async def ongoing_handler(self, event: events.NewMessage.Event): + messages = set() + for c in self.checkers: + if not isinstance(c, BaseChecker) or not c.persistent: + continue + for a in c.current_alerts: + message = format_message(a, note="ongoing") + messages.add(message) + if len(messages) == 0: + await event.respond("🟢 There are no ongoing events.") + for message in messages: + await event.respond(message) diff --git a/src/lego_monitoring/alerting/channel.py b/src/lego_monitoring/alerting/sender.py similarity index 100% rename from src/lego_monitoring/alerting/channel.py rename to src/lego_monitoring/alerting/sender.py diff --git a/src/lego_monitoring/checks/remind.py b/src/lego_monitoring/checks/remind.py index 7676eea..ca3eb61 100644 --- a/src/lego_monitoring/checks/remind.py +++ b/src/lego_monitoring/checks/remind.py @@ -1,43 +1,13 @@ -from typing import Any, Coroutine +from typing import Coroutine from lego_monitoring.alerting.alert import Alert -from lego_monitoring.config.enums import CheckSet from lego_monitoring.core.checkers import BaseChecker -def remind_check(checker_sets: dict[CheckSet, list[Coroutine | BaseChecker]]) -> list[Alert]: +def remind_check(checkers: list[Coroutine | BaseChecker]) -> list[Alert]: alerts = [] - for checker_set in checker_sets.values(): - for c in checker_set: - if not isinstance(c, BaseChecker) or not c.persistent or not c.remind: - continue - alerts.extend(c.current_alerts) + for c in checkers: + if not isinstance(c, BaseChecker) or not c.persistent or not c.remind: + continue + alerts.extend(c.current_alerts) return alerts - - # alert_num_by_state_with_max_type: dict[AlertType, list[Severity | int]] = {} - # for checker_set in checker_sets.values(): - # for c in checker_set: - # if not isinstance(c, BaseChecker) or not c.persistent: - # continue - # for a in c.current_alerts: - # if a.alert_type not in alert_num_by_state_with_max_type: - # alert_num_by_state_with_max_type[a.alert_type] = [a.severity, 1] - # else: - # existing_list = alert_num_by_state_with_max_type[a.alert_type] - # if a.severity > existing_list[0]: - # existing_list[0] = a.severity - # existing_list[1] += 1 - - # if len(alert_num_by_state_with_max_type) == 0: - # return [] - - # message = "There are ongoing events:" - # for at, sev_count in alert_num_by_state_with_max_type.items(): - # message += f"\n* {SEVERITY_TO_EMOJI[sev_count[0]]} {str(at)} - {sev_count[1]} alerts" - # message += ( - # "\n\nUse /ongoing to see them or /status to see this short reminder again (NOT IMPLEMENTED YET)." - # + "\nYou will also be reminded daily until the situation is resolved." - # ) - - # alert = Alert(alert_type=AlertType.REMIND, message=message, severity=max(alert_num_by_state_with_max_type.keys())) - # return [alert] diff --git a/src/lego_monitoring/checks/vulnix/__init__.py b/src/lego_monitoring/checks/vulnix/__init__.py index b494400..81f4820 100644 --- a/src/lego_monitoring/checks/vulnix/__init__.py +++ b/src/lego_monitoring/checks/vulnix/__init__.py @@ -1,6 +1,6 @@ from lego_monitoring.alerting.alert import Alert -from lego_monitoring.alerting.channel import send_alert from lego_monitoring.alerting.enum import AlertType, Severity +from lego_monitoring.alerting.sender import send_alert from .vulnix import get_vulnix_output diff --git a/src/lego_monitoring/core/checkers.py b/src/lego_monitoring/core/checkers.py index ea179a0..5000ee7 100644 --- a/src/lego_monitoring/core/checkers.py +++ b/src/lego_monitoring/core/checkers.py @@ -5,9 +5,9 @@ from dataclasses import KW_ONLY, dataclass, field from typing import Any, Callable, Coroutine from ..alerting.alert import Alert -from ..alerting.channel import send_alert from ..alerting.current import CurrentAlerts from ..alerting.enum import Severity +from ..alerting.sender import send_alert @dataclass @@ -34,10 +34,19 @@ class BaseChecker: False: this persistent checker's last alerts are reminded daily True: this persistent checker's last alerts are not reminded daily - + Has no effect if persistent == False """ + is_reminder: bool = False + """ + False: this non-persistent checker's alerts are tagged as normal + + True: this non-persistent checker's alerts are tagged as ongoing + + Has no effect if persistent == True + """ + check_args: list = field(default_factory=list) check_kwargs: dict[str, Any] = field(default_factory=dict) current_alerts: CurrentAlerts = field(default_factory=CurrentAlerts, init=False) @@ -56,7 +65,7 @@ class BaseChecker: async def _handle_alerts(self, alerts: list[Alert]) -> None: if not self.persistent: for alert in alerts: - await send_alert(alert) + await send_alert(alert, "ongoing" if self.is_reminder else "") return old_types = self.current_alerts.get_types() old_severity, new_severity = self.current_alerts.update(alerts)