mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-10 04:41:10 +00:00
78 lines
1.8 KiB
Python
78 lines
1.8 KiB
Python
import json
|
||
from dataclasses import dataclass
|
||
from enum import Enum, StrEnum
|
||
from typing import Optional
|
||
|
||
import aiofiles
|
||
import nio
|
||
|
||
from alerting.common import CREDS_FILE, ROOM_ID
|
||
|
||
|
||
class AlertType(StrEnum):
|
||
TEST = "TEST"
|
||
RAM = "RAM" # TODO
|
||
CPU = "CPU" # TODO
|
||
TEMP = "TEMP" # TODO
|
||
LOGIN = "LOGIN" # TODO
|
||
SMART = "SMART" # TODO
|
||
RAID = "RAID" # TODO
|
||
|
||
|
||
class Severity(Enum):
|
||
INFO = 1
|
||
WARNING = 2
|
||
CRITICAL = 3
|
||
|
||
|
||
@dataclass
|
||
class Alert:
|
||
alert_type: AlertType
|
||
message: str
|
||
severity: Severity
|
||
|
||
|
||
async def get_client() -> nio.AsyncClient:
|
||
"""
|
||
Returns a Matrix client.
|
||
It is better to call get_client once and use it for multiple send_alert calls
|
||
"""
|
||
async with aiofiles.open(CREDS_FILE) as f:
|
||
contents = await f.read()
|
||
creds = json.loads(contents)
|
||
client = nio.AsyncClient(creds["homeserver"])
|
||
client.access_token = creds["access_token"]
|
||
client.user_id = creds["user_id"]
|
||
client.device_id = creds["device_id"]
|
||
return client
|
||
|
||
|
||
def format_message(alert: Alert) -> str:
|
||
match alert.severity:
|
||
case Severity.INFO:
|
||
severity_emoji = "ℹ️"
|
||
case Severity.WARNING:
|
||
severity_emoji = "⚠️"
|
||
case Severity.CRITICAL:
|
||
severity_emoji = "🆘"
|
||
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
|
||
return message
|
||
|
||
|
||
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
|
||
if client is None:
|
||
temp_client = True
|
||
client = await get_client()
|
||
else:
|
||
temp_client = False
|
||
message = format_message(alert)
|
||
await client.room_send(
|
||
room_id=ROOM_ID,
|
||
message_type="m.room.message",
|
||
content={
|
||
"msgtype": "m.text",
|
||
"body": message,
|
||
},
|
||
)
|
||
if temp_client:
|
||
await client.close()
|