mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-10 12:45:19 +00:00
add temp monitoring
This commit is contained in:
parent
19ee6f487b
commit
758438382d
13 changed files with 272 additions and 25 deletions
1
src/lego_monitoring/checks/__init__.py
Normal file
1
src/lego_monitoring/checks/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
from .temp import temp_check
|
||||
29
src/lego_monitoring/checks/temp/__init__.py
Normal file
29
src/lego_monitoring/checks/temp/__init__.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
from lego_monitoring.alerting import alerts
|
||||
from lego_monitoring.alerting.enum import AlertType, Severity
|
||||
|
||||
from . import sensors
|
||||
|
||||
IS_TESTING = False
|
||||
|
||||
|
||||
def temp_check() -> list[alerts.Alert]:
|
||||
alert_list = []
|
||||
temps = sensors.get_readings()
|
||||
for sensor, readings in temps.items():
|
||||
for r in readings:
|
||||
if r.critical_temp is not None and (IS_TESTING or r.current_temp > r.critical_temp):
|
||||
alert = alerts.Alert(
|
||||
alert_type=AlertType.TEMP,
|
||||
message=f"{sensor} {r.label}: {r.current_temp}°C > {r.critical_temp}°C",
|
||||
severity=Severity.CRITICAL,
|
||||
)
|
||||
elif r.warning_temp is not None and (IS_TESTING or r.current_temp > r.warning_temp):
|
||||
alert = alerts.Alert(
|
||||
alert_type=AlertType.TEMP,
|
||||
message=f"{sensor} {r.label}: {r.current_temp}°C > {r.warning_temp}°C",
|
||||
severity=Severity.WARNING,
|
||||
)
|
||||
else:
|
||||
continue
|
||||
alert_list.append(alert)
|
||||
return alert_list
|
||||
66
src/lego_monitoring/checks/temp/sensors.py
Normal file
66
src/lego_monitoring/checks/temp/sensors.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from psutil import sensors_temperatures
|
||||
|
||||
from lego_monitoring.config.checks.temp import TempSensorConfig
|
||||
from lego_monitoring.core import cvars
|
||||
|
||||
|
||||
@dataclass
|
||||
class TemperatureReading:
|
||||
label: str
|
||||
current_temp: float
|
||||
warning_temp: Optional[float]
|
||||
critical_temp: Optional[float]
|
||||
|
||||
|
||||
def print_readings():
|
||||
sensor_readings = get_readings()
|
||||
for sensor, readings in sensor_readings.items():
|
||||
print(f"*** Sensor {sensor}***\n")
|
||||
for r in readings:
|
||||
print(f"Label: {r.label}")
|
||||
print(f"Current temp: {r.current_temp}")
|
||||
print(f"Warning temp: {r.warning_temp}")
|
||||
print(f"Critical temp: {r.critical_temp}\n")
|
||||
|
||||
|
||||
def get_readings() -> dict[str, list[TemperatureReading]]:
|
||||
try:
|
||||
config = cvars.config.get().checks.temp.sensors
|
||||
except LookupError:
|
||||
config: dict[str, TempSensorConfig] = {}
|
||||
|
||||
psutil_temperatures = sensors_temperatures()
|
||||
|
||||
sensor_readings = {}
|
||||
for sensor, readings in psutil_temperatures.items():
|
||||
if sensor in config:
|
||||
if not config[sensor].enabled:
|
||||
continue
|
||||
sensor_friendly_name = config[sensor].name if config[sensor].name else sensor
|
||||
else:
|
||||
sensor_friendly_name = sensor
|
||||
|
||||
sensor_readings[sensor_friendly_name] = []
|
||||
|
||||
for r in readings:
|
||||
try:
|
||||
config_r = config[sensor].readings[r.label]
|
||||
except KeyError:
|
||||
friendly_r = TemperatureReading(
|
||||
label=r.label, current_temp=r.current, warning_temp=r.high, critical_temp=r.critical
|
||||
)
|
||||
else:
|
||||
if not config_r.enabled:
|
||||
continue
|
||||
friendly_r = TemperatureReading(
|
||||
label=config_r.label if config_r.label else r.label,
|
||||
current_temp=r.current,
|
||||
warning_temp=config_r.warning_temp if config_r.warning_temp else r.high,
|
||||
critical_temp=config_r.critical_temp if config_r.critical_temp else r.critical,
|
||||
)
|
||||
sensor_readings[sensor_friendly_name].append(friendly_r)
|
||||
|
||||
return sensor_readings
|
||||
Loading…
Add table
Add a link
Reference in a new issue