lego-monitoring/src/lego_monitoring/checks/net.py
2025-06-07 15:59:05 +03:00

87 lines
3.8 KiB
Python

from dataclasses import dataclass, field
from typing import Optional
from humanize import naturalsize
from psutil import net_io_counters
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
IS_TESTING = False
SECONDS_BETWEEN_CHECKS = 5 * 60
@dataclass
class NetIOTracker:
sent_per_interface: dict[str, int] = field(default_factory=dict, init=False)
recv_per_interface: dict[str, int] = field(default_factory=dict, init=False)
@staticmethod
def check_threshold(
current_stat_bytes_per_sec: float,
critical_threshold: Optional[int],
warning_threshold: Optional[int],
stat_name: str,
interface: str,
) -> Optional[Alert]:
if critical_threshold and (IS_TESTING or current_stat_bytes_per_sec >= critical_threshold):
current_stat_natural = naturalsize(current_stat_bytes_per_sec, binary=True)
critical_threshold_natural = naturalsize(critical_threshold, binary=True)
return Alert(
alert_type=AlertType.NET,
message=f"Interface {interface} {stat_name} {current_stat_natural}/s >= {critical_threshold_natural}/s",
severity=Severity.CRITICAL,
)
elif warning_threshold and (IS_TESTING or current_stat_bytes_per_sec >= warning_threshold):
current_stat_natural = naturalsize(current_stat_bytes_per_sec, binary=True)
warning_threshold_natural = naturalsize(warning_threshold, binary=True)
return Alert(
alert_type=AlertType.NET,
message=f"Interface {interface} {stat_name} {current_stat_natural}/s >= {warning_threshold_natural}/s",
severity=Severity.WARNING,
)
def net_check(self) -> list[Alert]:
alerts = []
current_stats = net_io_counters(pernic=True)
config = cvars.config.get().checks.net
for interface, thresholds in config.interfaces.items():
if interface in self.sent_per_interface and interface in self.recv_per_interface:
sent_since_last_check_per_sec = (
current_stats[interface].bytes_sent - self.sent_per_interface[interface]
) / SECONDS_BETWEEN_CHECKS
recv_since_last_check_per_sec = (
current_stats[interface].bytes_recv - self.recv_per_interface[interface]
) / SECONDS_BETWEEN_CHECKS
comb_since_last_check_per_sec = sent_since_last_check_per_sec + recv_since_last_check_per_sec
if alert := self.check_threshold(
sent_since_last_check_per_sec,
thresholds.critical_threshold_sent_bytes,
thresholds.warning_threshold_sent_bytes,
"sent",
interface,
):
alerts.append(alert)
if alert := self.check_threshold(
recv_since_last_check_per_sec,
thresholds.critical_threshold_recv_bytes,
thresholds.warning_threshold_recv_bytes,
"recv",
interface,
):
alerts.append(alert)
if alert := self.check_threshold(
comb_since_last_check_per_sec,
thresholds.critical_threshold_comb_bytes,
thresholds.warning_threshold_comb_bytes,
"comb",
interface,
):
alerts.append(alert)
self.sent_per_interface[interface] = current_stats[interface].bytes_sent
self.recv_per_interface[interface] = current_stats[interface].bytes_recv
return alerts