mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-09 20:31:10 +00:00
167 lines
5.5 KiB
Python
167 lines
5.5 KiB
Python
import subprocess
|
|
from dataclasses import dataclass
|
|
from enum import StrEnum
|
|
|
|
from psutil import cpu_percent, sensors_temperatures, virtual_memory
|
|
|
|
from alerting import alerts
|
|
from alerting.enum import AlertType, Severity
|
|
|
|
|
|
@dataclass
|
|
class TemperatureSensor:
|
|
sensor_type: str
|
|
sensor_label: str
|
|
current_temp: float
|
|
highest_temp: float | None = None
|
|
critical_temp: float | None = None
|
|
|
|
|
|
@dataclass
|
|
class CpuSensor:
|
|
current_load: float
|
|
highest_load: float = 90
|
|
critical_load: float = 95
|
|
|
|
|
|
@dataclass
|
|
class RamSensor:
|
|
current_avail: int
|
|
current_avail_percentage: float
|
|
warning_avail: int = 4 * 1024**3
|
|
critical_avail: int = 2 * 1024**3
|
|
|
|
|
|
class UPSStatus(StrEnum):
|
|
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
|
|
|
|
ON_LINE = "OL"
|
|
ON_BATTERY = "OB"
|
|
BATTERY_LOW = "LB"
|
|
BATTERY_HIGH = "HB"
|
|
BATTERY_REPLACE = "RB"
|
|
BATTERY_CHARGING = "CHRG"
|
|
BATTERY_DISCHARGING = "DISCHRG"
|
|
UPS_BYPASS = "BYPASS"
|
|
"""Battery and connected devices are not protected from power outage!"""
|
|
UPS_OFFLINE = "OFF"
|
|
UPS_OVERLOAD = "OVER"
|
|
UPS_CALIBRATION = "CAL"
|
|
UPS_TRIM = "TRIM"
|
|
UPS_BOOST = "BOOST"
|
|
UPS_FSD = "FSD"
|
|
|
|
|
|
@dataclass
|
|
class UPSSensor:
|
|
ups_status: list[UPSStatus] = None
|
|
battery_charge_percentage: int = None
|
|
battery_warning_percentage: int = 20
|
|
battery_critical_percentage: int = 10
|
|
battery_runtime: int = 1000
|
|
|
|
|
|
class Sensors:
|
|
@staticmethod
|
|
def get_temperatures() -> dict[str, list[TemperatureSensor]]:
|
|
psutil_temp_sensors = sensors_temperatures()
|
|
|
|
temp_sensors = {}
|
|
|
|
for s_type, sensors in psutil_temp_sensors.items():
|
|
if s_type not in temp_sensors.keys():
|
|
temp_sensors[s_type] = []
|
|
match (s_type):
|
|
case "nvme":
|
|
for sensor in sensors:
|
|
temp_sensors[s_type].append(
|
|
TemperatureSensor(
|
|
sensor_type=s_type,
|
|
sensor_label=sensor.label,
|
|
current_temp=sensor.current,
|
|
highest_temp=sensor.high,
|
|
critical_temp=sensor.critical,
|
|
)
|
|
)
|
|
case "amdgpu":
|
|
temp_sensors[s_type].append(
|
|
TemperatureSensor(
|
|
sensor_type=s_type,
|
|
sensor_label="Integrated GPU",
|
|
current_temp=sensors[0].current,
|
|
)
|
|
)
|
|
case "k10temp":
|
|
temp_sensors[s_type].append(
|
|
TemperatureSensor(
|
|
sensor_type=s_type,
|
|
sensor_label="AMD CPU",
|
|
current_temp=sensors[0].current,
|
|
critical_temp=95.0, # hardcoded because we have R9 7900X
|
|
)
|
|
)
|
|
case "nct6687":
|
|
lables = {
|
|
"AMD TSI Addr 98h": "CPU",
|
|
"Diode 0 (curr)": "System",
|
|
"Thermistor 15": "VRM MOSFET",
|
|
"Thermistor 1": "Platform Controller Hub (Peripherals)",
|
|
"Thermistor 16": "CPU Socket",
|
|
}
|
|
|
|
for sensor in sensors[:-2]:
|
|
real_label = lables[sensor.label]
|
|
temp_sensors[s_type].append(
|
|
TemperatureSensor(
|
|
sensor_type=s_type,
|
|
sensor_label=real_label,
|
|
current_temp=sensor.current,
|
|
highest_temp=sensor.high or None,
|
|
critical_temp=sensor.critical or None,
|
|
)
|
|
)
|
|
|
|
return temp_sensors
|
|
|
|
@staticmethod
|
|
def get_cpu() -> CpuSensor:
|
|
return CpuSensor(current_load=cpu_percent())
|
|
|
|
@staticmethod
|
|
def get_ram() -> RamSensor:
|
|
ram = virtual_memory()
|
|
return RamSensor(current_avail=ram.available, current_avail_percentage=ram.percent)
|
|
|
|
@staticmethod
|
|
async def get_ups() -> None | UPSSensor:
|
|
try:
|
|
raw_data = subprocess.run(["upsc", "cp1300"], stdout=subprocess.PIPE, encoding="utf-8")
|
|
except FileNotFoundError:
|
|
await alerts.send_alert(
|
|
alerts.Alert(
|
|
alert_type=AlertType.ERROR,
|
|
message="upsc is not installed!",
|
|
severity=Severity.CRITICAL,
|
|
)
|
|
)
|
|
return None
|
|
|
|
sensor_data = UPSSensor()
|
|
|
|
for line in raw_data.stdout.splitlines():
|
|
sensor, value = line.split(": ")[:2]
|
|
match sensor:
|
|
case "battery.charge":
|
|
sensor_data.battery_charge_percentage = int(value)
|
|
case "battery.charge.low":
|
|
sensor_data.battery_critical_percentage = int(value)
|
|
case "battery.charge.warning":
|
|
sensor_data.battery_warning_percentage = int(value)
|
|
case "battery.runtime":
|
|
sensor_data.battery_runtime = int(value)
|
|
case "ups.status":
|
|
sensor_data.ups_status = [UPSStatus(status) for status in value.split()]
|
|
case _:
|
|
...
|
|
|
|
return sensor_data
|