from telethon import TelegramClient from telethon.sessions import MemorySession from uplink import AiohttpClient from ..core import cvars from .alert import Alert from .clients.healthchecks import HealthchecksClient from .enum import SEVERITY_TO_EMOJI, AlertType, Severity async def get_tg_client() -> TelegramClient: config = cvars.config.get() api_id, api_hash, bot_token = config.alert_channels.telegram.creds.split(",") client = await TelegramClient(MemorySession(), api_id, api_hash, connection_retries=None).start(bot_token=bot_token) client.parse_mode = "html" return client def get_healthchecks_client() -> HealthchecksClient: config = cvars.config.get() base_url = config.alert_channels.healthchecks.pinging_api_endpoint client = HealthchecksClient( base_url=config.alert_channels.healthchecks.pinging_api_endpoint, client=AiohttpClient() ) return client def format_message(alert: Alert, note: str) -> str: severity_emoji = SEVERITY_TO_EMOJI[alert.severity] note_formatted = f"{note}, " if note else "" if "ongoing" in note_formatted: note_formatted += f"since {alert.created.isoformat()}" else: note_formatted += f"at {alert.created.isoformat()}" message = f"{severity_emoji} {alert.alert_type} Alert - {note_formatted}\n{alert.message}" return message async def send_alert(alert: Alert, note: str = "") -> None: try: tg_client = cvars.tg_client.get() except LookupError: # being called standalone # cvars.config.set(get_config()) # temp_client = True # client = await get_tg_client() # cvars.matrix_client.set(client) raise NotImplementedError # TODO else: ... # temp_client = False if tg_client is not None: room_id = cvars.config.get().alert_channels.telegram.room_id message = format_message(alert, note) await tg_client.send_message(entity=room_id, message=message) # if temp_client: # await client.close() # TODO ping healthchecks if enabled if alert.healthchecks_slug is not None: raise NotImplementedError # TODO service itself has to be monitored like everything else - with regular pinging - if we're # using healthchecks async def send_start_alert() -> None: config = cvars.config.get() await send_alert( Alert( alert_type=AlertType.BOOT, message=f"Service running with enabled checks: {', '.join(config.enabled_check_sets)}", severity=Severity.INFO, ) ) async def send_stop_alert() -> None: await send_alert( Alert( alert_type=AlertType.BOOT, message="Service stopping.", severity=Severity.INFO, ) )