/status and /ongoing

This commit is contained in:
Alex Tau 2025-06-06 15:38:48 +03:00
parent 62a25410cc
commit 6cc3966221
6 changed files with 121 additions and 47 deletions

View file

@ -0,0 +1,88 @@
from dataclasses import dataclass
from typing import Awaitable, Callable
from telethon import TelegramClient, events, functions, types
from lego_monitoring.core import cvars
from lego_monitoring.core.checkers import BaseChecker
from .enum import SEVERITY_TO_EMOJI, AlertType, Severity
from .sender import format_message
def admin_chat_only(
handler: Callable[[events.NewMessage.Event], Awaitable[None]],
) -> Callable[[events.NewMessage.Event], Awaitable[None]]:
admin_room_id = cvars.config.get().telegram.room_id
async def safe_handler(event: events.NewMessage.Event) -> None:
if event.chat_id == admin_room_id:
return await handler(event)
return safe_handler
@dataclass
class CommandHandlerManager:
checkers: list[BaseChecker]
async def attach_handlers(self, tg_client: TelegramClient):
my_username = (await tg_client.get_me()).username
@tg_client.on(events.NewMessage(pattern=f"/status(?:@{my_username})?"))
@admin_chat_only
async def status(event: events.NewMessage.Event):
return await self.status_handler(event)
@tg_client.on(events.NewMessage(pattern=f"/ongoing(?:@{my_username})?"))
@admin_chat_only
async def status(event: events.NewMessage.Event):
return await self.ongoing_handler(event)
await tg_client(
functions.bots.SetBotCommandsRequest(
scope=types.BotCommandScopeDefault(),
lang_code="en",
commands=[
types.BotCommand(command="status", description="Get current system status"),
types.BotCommand(command="ongoing", description="Show ongoing alerts"),
],
)
)
async def status_handler(self, event: events.NewMessage.Event):
alert_num_by_state_with_max_type: dict[AlertType, list[Severity | int]] = {}
for c in self.checkers:
if not isinstance(c, BaseChecker) or not c.persistent:
continue
for a in c.current_alerts:
if a.alert_type not in alert_num_by_state_with_max_type:
alert_num_by_state_with_max_type[a.alert_type] = [a.severity, 1]
else:
existing_list = alert_num_by_state_with_max_type[a.alert_type]
if a.severity > existing_list[0]:
existing_list[0] = a.severity
existing_list[1] += 1
if len(alert_num_by_state_with_max_type) == 0:
message = "🟢 There are no ongoing events."
else:
message = "There are ongoing events:"
for at, sev_count in alert_num_by_state_with_max_type.items():
message += f"\n* {SEVERITY_TO_EMOJI[sev_count[0]]} {str(at)} - {sev_count[1]} alerts"
message += "\n\nUse /ongoing to see them."
await event.respond(message)
async def ongoing_handler(self, event: events.NewMessage.Event):
messages = set()
for c in self.checkers:
if not isinstance(c, BaseChecker) or not c.persistent:
continue
for a in c.current_alerts:
message = format_message(a, note="ongoing")
messages.add(message)
if len(messages) == 0:
await event.respond("🟢 There are no ongoing events.")
for message in messages:
await event.respond(message)