import json from dataclasses import dataclass from enum import Enum, StrEnum from typing import Optional import aiofiles import nio from alerting.common import CREDS_FILE, ROOM_ID class AlertType(StrEnum): TEST = "TEST" RAM = "RAM" # TODO CPU = "CPU" # TODO TEMP = "TEMP" LOGIN = "LOGIN" # TODO SMART = "SMART" # TODO RAID = "RAID" # TODO class Severity(Enum): INFO = 1 WARNING = 2 CRITICAL = 3 @dataclass class Alert: alert_type: AlertType message: str severity: Severity async def get_client() -> nio.AsyncClient: """ Returns a Matrix client. It is better to call get_client once and use it for multiple send_alert calls """ async with aiofiles.open(CREDS_FILE) as f: contents = await f.read() creds = json.loads(contents) client = nio.AsyncClient(creds["homeserver"]) client.access_token = creds["access_token"] client.user_id = creds["user_id"] client.device_id = creds["device_id"] return client def format_message(alert: Alert) -> str: match alert.severity: case Severity.INFO: severity_emoji = "â„šī¸" case Severity.WARNING: severity_emoji = "âš ī¸" case Severity.CRITICAL: severity_emoji = "🆘" message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}" return message async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None: if client is None: temp_client = True client = await get_client() else: temp_client = False message = format_message(alert) await client.room_send( room_id=ROOM_ID, message_type="m.room.message", content={ "msgtype": "m.text", "body": message, }, ) if temp_client: await client.close()