Merge branch 'alert-sender' into 'main'

Alert sender service

See merge request lego/lego-monitoring!2
This commit is contained in:
Alex 2024-06-16 20:47:43 +00:00
commit a4bbe835d3
12 changed files with 300 additions and 56 deletions

3
.gitignore vendored
View file

@ -1,2 +1,3 @@
.venv
__pycache__
__pycache__
alerting/credentials.json

View file

@ -1 +1,18 @@
# lego-monitoring
## Prerequisites
* `pacman -S libolm`
* `pip -r requirements.txt`
## Configuring
* Run `alerting/login.py` once to generate credentials file
* Invite the bot account to the room (you have to accept the invite manually)
* Set room ID in `alerting/common.py`
## Running
* `prettyprint.py` -- check and print all sensors
* `service.py` -- launch service
* `lego-monitoring.service` is a systemd unit that starts `service.py`

78
alerting/alerts.py Normal file
View file

@ -0,0 +1,78 @@
import json
from dataclasses import dataclass
from enum import Enum, StrEnum
from typing import Optional
import aiofiles
import nio
from alerting.common import CREDS_FILE, ROOM_ID
class AlertType(StrEnum):
TEST = "TEST"
RAM = "RAM" # TODO
CPU = "CPU" # TODO
TEMP = "TEMP"
LOGIN = "LOGIN" # TODO
SMART = "SMART" # TODO
RAID = "RAID" # TODO
class Severity(Enum):
INFO = 1
WARNING = 2
CRITICAL = 3
@dataclass
class Alert:
alert_type: AlertType
message: str
severity: Severity
async def get_client() -> nio.AsyncClient:
"""
Returns a Matrix client.
It is better to call get_client once and use it for multiple send_alert calls
"""
async with aiofiles.open(CREDS_FILE) as f:
contents = await f.read()
creds = json.loads(contents)
client = nio.AsyncClient(creds["homeserver"])
client.access_token = creds["access_token"]
client.user_id = creds["user_id"]
client.device_id = creds["device_id"]
return client
def format_message(alert: Alert) -> str:
match alert.severity:
case Severity.INFO:
severity_emoji = ""
case Severity.WARNING:
severity_emoji = "⚠️"
case Severity.CRITICAL:
severity_emoji = "🆘"
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
return message
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
if client is None:
temp_client = True
client = await get_client()
else:
temp_client = False
message = format_message(alert)
await client.room_send(
room_id=ROOM_ID,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
},
)
if temp_client:
await client.close()

8
alerting/common.py Normal file
View file

@ -0,0 +1,8 @@
import os
from pathlib import Path
CREDS_FILE = (Path(os.path.dirname(os.path.realpath(__file__))) / "credentials.json").resolve()
HOMESERVER = "https://matrix.altau.su"
USER_ID = "@AlertBot:altau.su"
DEVICE_NAME = "lego"
ROOM_ID = "!aSCaiSJfLHslrJrHiJ:altau.su"

41
alerting/login.py Executable file
View file

@ -0,0 +1,41 @@
#!/usr/bin/env python3
import asyncio
import getpass
import json
import os
import stat
from common import CREDS_FILE, DEVICE_NAME, HOMESERVER, USER_ID
from nio import AsyncClient, LoginResponse
async def main() -> None:
if os.path.exists(CREDS_FILE):
print(f"Creds already configured in {CREDS_FILE}")
raise SystemExit
client = AsyncClient(HOMESERVER, USER_ID)
password = getpass.getpass()
resp = await client.login(password, device_name=DEVICE_NAME)
await client.close()
if isinstance(resp, LoginResponse):
open(CREDS_FILE, "w").close()
os.chmod(CREDS_FILE, stat.S_IRUSR | stat.S_IWUSR)
with open(CREDS_FILE, "w") as f:
json.dump(
{
"homeserver": HOMESERVER,
"user_id": resp.user_id,
"device_id": resp.device_id,
"access_token": resp.access_token,
},
f,
)
print(f"Logged in as {resp.user_id}. Credentials saved to {CREDS_FILE}")
else:
raise Exception(f"Failed to log in: {resp}")
if __name__ == "__main__":
asyncio.run(main())

10
lego-monitoring.service Normal file
View file

@ -0,0 +1,10 @@
[Unit]
After=docker.service
[Service]
ExecStart=/opt/lego-monitoring/.venv/bin/python /opt/lego-monitoring/service.py
Type=exec
Restart=always
[Install]
WantedBy=multi-user.target

25
misc/checks.py Normal file
View file

@ -0,0 +1,25 @@
from alerting import alerts
from misc import sensors
def temp_check() -> set[alerts.Alert]:
alert_list = []
temps = sensors.Sensors.get_temperatures()
for _, sensor_list in temps.items():
for sensor in sensor_list:
if sensor.critical_temp is not None and sensor.current_temp > sensor.critical_temp:
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C",
severity=alerts.Severity.CRITICAL,
)
elif sensor.highest_temp is not None and sensor.current_temp > sensor.highest_temp:
alert = alerts.Alert(
alert_type=alerts.AlertType("TEMP"),
message=f"{sensor.sensor_type} {sensor.sensor_label}: {sensor.current_temp}°C > {sensor.critical_temp}°C",
severity=alerts.Severity.WARNING,
)
else:
continue
alert_list.append(alert)
return alert_list

60
misc/sensors.py Normal file
View file

@ -0,0 +1,60 @@
from dataclasses import dataclass
from psutil import sensors_temperatures
@dataclass
class TemperatureSensor:
sensor_type: str
sensor_label: str
current_temp: float
highest_temp: float | None = None
critical_temp: float | None = None
class Sensors:
@staticmethod
def get_temperatures() -> dict[str, list[TemperatureSensor]]:
psutil_temp_sensors = sensors_temperatures()
temp_sensors = {}
for s_type, sensors in psutil_temp_sensors.items():
if s_type not in temp_sensors.keys():
temp_sensors[s_type] = []
match (s_type):
case "nvme":
for sensor in sensors:
temp_sensors[s_type].append(
TemperatureSensor(
sensor_type=s_type,
sensor_label=sensor.label,
current_temp=sensor.current,
highest_temp=sensor.high,
critical_temp=sensor.critical,
)
)
case "amdgpu":
temp_sensors[s_type].append(
TemperatureSensor(
sensor_type=s_type,
sensor_label="Integrated GPU",
current_temp=sensors[0].current,
)
)
case "k10temp":
temp_sensors[s_type].append(
TemperatureSensor(
sensor_type=s_type,
sensor_label="AMD CPU",
current_temp=sensors[0].current,
critical_temp=95.0, # hardcoded because we have R9 7900X
)
)
return temp_sensors
if __name__ == "__main__":
for i in Sensors.get_temperatures():
print(i)

View file

@ -2,7 +2,7 @@
from colorama import Back, Fore, Style
from sensors import Sensors
from misc.sensors import Sensors
def pretty_print():

View file

@ -1,2 +1,3 @@
colorama==0.4.6
psutil==5.9.8
matrix-nio[e2e]==0.24.0

View file

@ -1,54 +0,0 @@
from dataclasses import dataclass
from psutil import sensors_temperatures
@dataclass
class TemperatureSensor:
sensor_type: str
sensor_label: str
current_temp: float
highest_temp: float | None = None
critical_temp: float | None = None
class Sensors:
@staticmethod
def get_temperatures() -> dict[str, list[TemperatureSensor]]:
psutil_temp_sensors = sensors_temperatures()
temp_sensors = {}
for s_type, sensors in psutil_temp_sensors.items():
if s_type not in temp_sensors.keys():
temp_sensors[s_type] = []
match(s_type):
case "nvme":
for sensor in sensors:
temp_sensors[s_type].append(TemperatureSensor(
sensor_type=s_type,
sensor_label=sensor.label,
current_temp=sensor.current,
highest_temp=sensor.high,
critical_temp=sensor.critical
))
case "amdgpu":
temp_sensors[s_type].append(TemperatureSensor(
sensor_type=s_type,
sensor_label="Integrated GPU",
current_temp=sensors[0].current,
))
case "k10temp":
temp_sensors[s_type].append(TemperatureSensor(
sensor_type=s_type,
sensor_label="AMD CPU",
current_temp=sensors[0].current,
critical_temp=95.0 # hardcoded because we have R9 7900X
))
return temp_sensors
if __name__ == "__main__":
for i in Sensors.get_temperatures():
print(i)

57
service.py Executable file
View file

@ -0,0 +1,57 @@
#!/usr/bin/env python3
import asyncio
import logging
import signal
from typing import Callable, Coroutine
import nio
from alerting import alerts
from misc import checks
logging.basicConfig(level=logging.INFO)
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
async def checker(check: Callable | Coroutine, interval_secs: int, client: nio.AsyncClient, *args, **kwargs):
while True:
logging.info(f"Calling {check.__name__}")
if isinstance(check, Callable):
result = check(*args, **kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(check, Coroutine):
result = await check
else:
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert, client)
await asyncio.sleep(interval_secs)
async def main():
signal.signal(signal.SIGTERM, stop_gracefully)
client = await alerts.get_client()
checkers = (checker(checks.temp_check, 5 * 60, client),)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
await client.close()
raise SystemExit
else:
await asyncio.sleep(3)
if __name__ == "__main__":
asyncio.run(main())