mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-10 04:41:10 +00:00
move existing stuff to archive dir (for now)
This commit is contained in:
parent
ae1204449c
commit
4fc491f61a
32 changed files with 0 additions and 0 deletions
167
misc/sensors.py
167
misc/sensors.py
|
|
@ -1,167 +0,0 @@
|
|||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
|
||||
from psutil import cpu_percent, sensors_temperatures, virtual_memory
|
||||
|
||||
from alerting import alerts
|
||||
from alerting.enum import AlertType, Severity
|
||||
|
||||
|
||||
@dataclass
|
||||
class TemperatureSensor:
|
||||
sensor_type: str
|
||||
sensor_label: str
|
||||
current_temp: float
|
||||
highest_temp: float | None = None
|
||||
critical_temp: float | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class CpuSensor:
|
||||
current_load: float
|
||||
highest_load: float = 90
|
||||
critical_load: float = 95
|
||||
|
||||
|
||||
@dataclass
|
||||
class RamSensor:
|
||||
current_avail: int
|
||||
current_avail_percentage: float
|
||||
warning_avail: int = 4 * 1024**3
|
||||
critical_avail: int = 2 * 1024**3
|
||||
|
||||
|
||||
class UPSStatus(StrEnum):
|
||||
"""https://networkupstools.org/docs/developer-guide.chunked/new-drivers.html#_status_data"""
|
||||
|
||||
ON_LINE = "OL"
|
||||
ON_BATTERY = "OB"
|
||||
BATTERY_LOW = "LB"
|
||||
BATTERY_HIGH = "HB"
|
||||
BATTERY_REPLACE = "RB"
|
||||
BATTERY_CHARGING = "CHRG"
|
||||
BATTERY_DISCHARGING = "DISCHRG"
|
||||
UPS_BYPASS = "BYPASS"
|
||||
"""Battery and connected devices are not protected from power outage!"""
|
||||
UPS_OFFLINE = "OFF"
|
||||
UPS_OVERLOAD = "OVER"
|
||||
UPS_CALIBRATION = "CAL"
|
||||
UPS_TRIM = "TRIM"
|
||||
UPS_BOOST = "BOOST"
|
||||
UPS_FSD = "FSD"
|
||||
|
||||
|
||||
@dataclass
|
||||
class UPSSensor:
|
||||
ups_status: list[UPSStatus] = None
|
||||
battery_charge_percentage: int = None
|
||||
battery_warning_percentage: int = 20
|
||||
battery_critical_percentage: int = 10
|
||||
battery_runtime: int = 1000
|
||||
|
||||
|
||||
class Sensors:
|
||||
@staticmethod
|
||||
def get_temperatures() -> dict[str, list[TemperatureSensor]]:
|
||||
psutil_temp_sensors = sensors_temperatures()
|
||||
|
||||
temp_sensors = {}
|
||||
|
||||
for s_type, sensors in psutil_temp_sensors.items():
|
||||
if s_type not in temp_sensors.keys():
|
||||
temp_sensors[s_type] = []
|
||||
match (s_type):
|
||||
case "nvme":
|
||||
for sensor in sensors:
|
||||
temp_sensors[s_type].append(
|
||||
TemperatureSensor(
|
||||
sensor_type=s_type,
|
||||
sensor_label=sensor.label,
|
||||
current_temp=sensor.current,
|
||||
highest_temp=sensor.high,
|
||||
critical_temp=sensor.critical,
|
||||
)
|
||||
)
|
||||
case "amdgpu":
|
||||
temp_sensors[s_type].append(
|
||||
TemperatureSensor(
|
||||
sensor_type=s_type,
|
||||
sensor_label="Integrated GPU",
|
||||
current_temp=sensors[0].current,
|
||||
)
|
||||
)
|
||||
case "k10temp":
|
||||
temp_sensors[s_type].append(
|
||||
TemperatureSensor(
|
||||
sensor_type=s_type,
|
||||
sensor_label="AMD CPU",
|
||||
current_temp=sensors[0].current,
|
||||
critical_temp=95.0, # hardcoded because we have R9 7900X
|
||||
)
|
||||
)
|
||||
case "nct6687":
|
||||
lables = {
|
||||
"AMD TSI Addr 98h": "CPU",
|
||||
"Diode 0 (curr)": "System",
|
||||
"Thermistor 15": "VRM MOSFET",
|
||||
"Thermistor 1": "Platform Controller Hub (Peripherals)",
|
||||
"Thermistor 16": "CPU Socket",
|
||||
}
|
||||
|
||||
for sensor in sensors[:-2]:
|
||||
real_label = lables[sensor.label]
|
||||
temp_sensors[s_type].append(
|
||||
TemperatureSensor(
|
||||
sensor_type=s_type,
|
||||
sensor_label=real_label,
|
||||
current_temp=sensor.current,
|
||||
highest_temp=sensor.high or None,
|
||||
critical_temp=sensor.critical or None,
|
||||
)
|
||||
)
|
||||
|
||||
return temp_sensors
|
||||
|
||||
@staticmethod
|
||||
def get_cpu() -> CpuSensor:
|
||||
return CpuSensor(current_load=cpu_percent())
|
||||
|
||||
@staticmethod
|
||||
def get_ram() -> RamSensor:
|
||||
ram = virtual_memory()
|
||||
return RamSensor(current_avail=ram.available, current_avail_percentage=ram.percent)
|
||||
|
||||
@staticmethod
|
||||
async def get_ups() -> None | UPSSensor:
|
||||
try:
|
||||
raw_data = subprocess.run(["upsc", "cp1300"], stdout=subprocess.PIPE, encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
await alerts.send_alert(
|
||||
alerts.Alert(
|
||||
alert_type=AlertType.ERROR,
|
||||
message="upsc is not installed!",
|
||||
severity=Severity.CRITICAL,
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
sensor_data = UPSSensor()
|
||||
|
||||
for line in raw_data.stdout.splitlines():
|
||||
sensor, value = line.split(": ")[:2]
|
||||
match sensor:
|
||||
case "battery.charge":
|
||||
sensor_data.battery_charge_percentage = int(value)
|
||||
case "battery.charge.low":
|
||||
sensor_data.battery_critical_percentage = int(value)
|
||||
case "battery.charge.warning":
|
||||
sensor_data.battery_warning_percentage = int(value)
|
||||
case "battery.runtime":
|
||||
sensor_data.battery_runtime = int(value)
|
||||
case "ups.status":
|
||||
sensor_data.ups_status = [UPSStatus(status) for status in value.split()]
|
||||
case _:
|
||||
...
|
||||
|
||||
return sensor_data
|
||||
Loading…
Add table
Add a link
Reference in a new issue