mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-09 20:31:10 +00:00
81 lines
2.9 KiB
Python
81 lines
2.9 KiB
Python
import logging
|
|
|
|
from telethon import TelegramClient
|
|
from telethon.sessions import MemorySession
|
|
from uplink import AiohttpClient
|
|
|
|
from ..core import cvars
|
|
from .alert import Alert
|
|
from .clients.healthchecks import HealthchecksClient
|
|
from .enum import SEVERITY_TO_EMOJI, Severity
|
|
|
|
|
|
async def get_tg_client() -> TelegramClient:
|
|
config = cvars.config.get()
|
|
api_id, api_hash, bot_token = config.alert_channels.telegram.creds.split(",")
|
|
client = await TelegramClient(MemorySession(), api_id, api_hash, connection_retries=None).start(bot_token=bot_token)
|
|
client.parse_mode = "html"
|
|
return client
|
|
|
|
|
|
def get_healthchecks_client() -> HealthchecksClient:
|
|
config = cvars.config.get()
|
|
base_url = config.alert_channels.healthchecks.pinging_api_endpoint
|
|
client = HealthchecksClient(base_url=base_url, client=AiohttpClient())
|
|
return client
|
|
|
|
|
|
def format_message(alert: Alert, note: str) -> str:
|
|
severity_emoji = SEVERITY_TO_EMOJI[alert.severity]
|
|
note_formatted = f"{note}, " if note else ""
|
|
if "ongoing" in note_formatted:
|
|
note_formatted += f"since {alert.created.isoformat()}"
|
|
else:
|
|
note_formatted += f"at {alert.created.isoformat()}"
|
|
message = f"{severity_emoji} {alert.alert_type} Alert - <i>{note_formatted}</i>\n{alert.message}"
|
|
return message
|
|
|
|
|
|
async def send_alert(alert: Alert, note: str = "") -> None:
|
|
logging.debug(f"Sending {alert.alert_type} alert to Telegram")
|
|
try:
|
|
tg_client = cvars.tg_client.get()
|
|
except LookupError: # being called standalone
|
|
# cvars.config.set(get_config())
|
|
# temp_client = True
|
|
# client = await get_tg_client()
|
|
# cvars.matrix_client.set(client)
|
|
raise NotImplementedError # TODO
|
|
else:
|
|
... # temp_client = False
|
|
if tg_client is not None:
|
|
room_id = cvars.config.get().alert_channels.telegram.room_id
|
|
message = format_message(alert, note)
|
|
await tg_client.send_message(entity=room_id, message=message)
|
|
# if temp_client:
|
|
# await client.close()
|
|
|
|
|
|
async def send_healthchecks_status(alert: Alert) -> None:
|
|
def get_pinging_key(keys: dict[str, str]):
|
|
if alert.healthchecks_slug in keys:
|
|
return keys[alert.healthchecks_slug]
|
|
else:
|
|
return keys["default"]
|
|
|
|
logging.debug(f"Sending {alert.alert_type} to Healthchecks")
|
|
|
|
if alert.healthchecks_slug is None:
|
|
return
|
|
try:
|
|
hc_client = cvars.healthchecks_client.get()
|
|
except LookupError:
|
|
raise NotImplementedError # TODO
|
|
if hc_client is None:
|
|
return
|
|
config = cvars.config.get()
|
|
key = get_pinging_key(config.alert_channels.healthchecks.pinging_keys)
|
|
if alert.severity == Severity.OK:
|
|
await hc_client.success(key, alert.healthchecks_slug, create=True, log=alert.plain_message)
|
|
else:
|
|
await hc_client.failure(key, alert.healthchecks_slug, create=True, log=alert.plain_message)
|