lego-monitoring/src/lego_monitoring/alerting/sender.py

85 lines
3 KiB
Python

import logging
from socket import gethostname
from telethon import TelegramClient
from telethon.sessions import MemorySession
from uplink import AiohttpClient
from ..checks.utils import format_for_healthchecks_slug
from ..core import cvars
from .alert import Alert
from .clients.healthchecks import HealthchecksClient
from .enum import SEVERITY_TO_EMOJI, AlertType, Severity
async def get_tg_client() -> TelegramClient:
config = cvars.config.get()
api_id, api_hash, bot_token = config.alert_channels.telegram.creds.split(",")
client = await TelegramClient(MemorySession(), api_id, api_hash, connection_retries=None).start(bot_token=bot_token)
client.parse_mode = "html"
return client
def get_healthchecks_client() -> HealthchecksClient:
config = cvars.config.get()
base_url = config.alert_channels.healthchecks.pinging_api_endpoint
client = HealthchecksClient(
base_url=config.alert_channels.healthchecks.pinging_api_endpoint, client=AiohttpClient()
)
return client
def format_message(alert: Alert, note: str) -> str:
severity_emoji = SEVERITY_TO_EMOJI[alert.severity]
note_formatted = f"{note}, " if note else ""
if "ongoing" in note_formatted:
note_formatted += f"since {alert.created.isoformat()}"
else:
note_formatted += f"at {alert.created.isoformat()}"
message = f"{severity_emoji} {alert.alert_type} Alert - <i>{note_formatted}</i>\n{alert.message}"
return message
async def send_alert(alert: Alert, note: str = "") -> None:
logging.debug(f"Sending {alert.alert_type} alert to Telegram")
try:
tg_client = cvars.tg_client.get()
except LookupError: # being called standalone
# cvars.config.set(get_config())
# temp_client = True
# client = await get_tg_client()
# cvars.matrix_client.set(client)
raise NotImplementedError # TODO
else:
... # temp_client = False
if tg_client is not None:
room_id = cvars.config.get().alert_channels.telegram.room_id
message = format_message(alert, note)
await tg_client.send_message(entity=room_id, message=message)
# if temp_client:
# await client.close()
async def send_healthchecks_status(alert: Alert) -> None:
def get_pinging_key(keys: dict[str, str]):
if alert.healthchecks_slug in keys:
return keys[alert.healthchecks_slug]
else:
return keys["default"]
logging.debug(f"Sending {alert.alert_type} to Healthchecks")
if alert.healthchecks_slug is None:
return
try:
hc_client = cvars.healthchecks_client.get()
except LookupError:
raise NotImplementedError # TODO
if hc_client is None:
return
config = cvars.config.get()
key = get_pinging_key(config.alert_channels.healthchecks.pinging_keys)
if alert.severity == Severity.OK:
await hc_client.success(key, alert.healthchecks_slug, create=True, log=alert.plain_message)
else:
await hc_client.failure(key, alert.healthchecks_slug, create=True, log=alert.plain_message)