alerting framework

This commit is contained in:
Alex 2024-06-16 14:43:35 +03:00
parent 0184a47345
commit d3c8b000b0
6 changed files with 145 additions and 1 deletions

78
alerting/alerts.py Normal file
View file

@ -0,0 +1,78 @@
import json
from dataclasses import dataclass
from enum import Enum, StrEnum
from typing import Optional
import aiofiles
import nio
from alerting.common import CREDS_FILE, ROOM_ID
class AlertType(StrEnum):
TEST = "TEST"
RAM = "RAM" # TODO
CPU = "CPU" # TODO
TEMP = "TEMP" # TODO
LOGIN = "LOGIN" # TODO
SMART = "SMART" # TODO
RAID = "RAID" # TODO
class Severity(Enum):
INFO = 1
WARNING = 2
CRITICAL = 3
@dataclass
class Alert:
alert_type: AlertType
message: str
severity: Severity
async def get_client() -> nio.AsyncClient:
"""
Returns a Matrix client.
It is better to call get_client once and use it for multiple send_alert calls
"""
async with aiofiles.open(CREDS_FILE) as f:
contents = await f.read()
creds = json.loads(contents)
client = nio.AsyncClient(creds["homeserver"])
client.access_token = creds["access_token"]
client.user_id = creds["user_id"]
client.device_id = creds["device_id"]
return client
def format_message(alert: Alert) -> str:
match alert.severity:
case Severity.INFO:
severity_emoji = ""
case Severity.WARNING:
severity_emoji = "⚠️"
case Severity.CRITICAL:
severity_emoji = "🆘"
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
return message
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
if client is None:
temp_client = True
client = await get_client()
else:
temp_client = False
message = format_message(alert)
await client.room_send(
room_id=ROOM_ID,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
},
)
if temp_client:
await client.close()