add slugs to checks, enabling sending them to healthchecks

This commit is contained in:
Alex Tau 2025-08-15 19:22:58 +03:00
parent be7b3dbeed
commit 5c57e1765e
8 changed files with 87 additions and 7 deletions

View file

@ -54,9 +54,28 @@ async def send_alert(alert: Alert, note: str = "") -> None:
# if temp_client:
# await client.close()
# TODO ping healthchecks if enabled
if alert.healthchecks_slug is not None:
raise NotImplementedError
async def send_healthchecks_status(alert: Alert) -> None:
def get_pinging_key(keys: dict[str, str]):
if alert.healthchecks_slug in keys:
return keys[alert.healthchecks_slug]
else:
return keys["default"]
if alert.healthchecks_slug is None:
return
try:
hc_client = cvars.healthchecks_client.get()
except LookupError:
raise NotImplementedError # TODO
if hc_client is None:
return
config = cvars.config.get()
key = get_pinging_key(config.alert_channels.healthchecks.pinging_keys)
if alert.severity == Severity.OK:
await hc_client.success(key, alert.healthchecks_slug, create=True, log=alert.plain_message)
else:
await hc_client.failure(key, alert.healthchecks_slug, create=True, log=alert.plain_message)
# TODO service itself has to be monitored like everything else - with regular pinging - if we're

View file

@ -1,21 +1,27 @@
from socket import gethostname
from psutil import cpu_percent
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
from .utils import format_for_healthchecks_slug
IS_TESTING = False
def cpu_check() -> list[Alert]:
percentage = cpu_percent()
config = cvars.config.get().checks.cpu
slug = f"{format_for_healthchecks_slug(gethostname())}-cpu"
if config.critical_percentage and (IS_TESTING or percentage >= config.critical_percentage):
return [
Alert(
alert_type=AlertType.CPU,
message=f"CPU load: {percentage:.2f}% >= {config.critical_percentage:.2f}%",
severity=Severity.CRITICAL,
healthchecks_slug=slug,
)
]
elif config.warning_percentage and (IS_TESTING or percentage >= config.warning_percentage):
@ -24,7 +30,15 @@ def cpu_check() -> list[Alert]:
alert_type=AlertType.CPU,
message=f"CPU load: {percentage:.2f}% >= {config.warning_percentage:.2f}%",
severity=Severity.WARNING,
healthchecks_slug=slug,
)
]
else:
return [Alert(alert_type=AlertType.CPU, message=f"CPU load: {percentage:.2f}% (nominal)", severity=Severity.OK)]
return [
Alert(
alert_type=AlertType.CPU,
message=f"CPU load: {percentage:.2f}% (nominal)",
severity=Severity.OK,
healthchecks_slug=slug,
)
]

View file

@ -1,4 +1,5 @@
from dataclasses import dataclass, field
from socket import gethostname
from typing import Optional
from humanize import naturalsize
@ -8,6 +9,8 @@ from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
from .utils import format_for_healthchecks_slug
IS_TESTING = False
SECONDS_BETWEEN_CHECKS = 5 * 60
@ -25,6 +28,9 @@ class NetIOTracker:
stat_name: str,
interface: str,
) -> Optional[Alert]:
slug = (
f"{format_for_healthchecks_slug(gethostname())}-net-{format_for_healthchecks_slug(interface)}-{stat_name}"
)
current_stat_natural = naturalsize(current_stat_bytes_per_sec, binary=True)
if critical_threshold and (IS_TESTING or current_stat_bytes_per_sec >= critical_threshold):
critical_threshold_natural = naturalsize(critical_threshold, binary=True)
@ -32,6 +38,7 @@ class NetIOTracker:
alert_type=AlertType.NET,
message=f"Interface {interface} {stat_name} {current_stat_natural}/s >= {critical_threshold_natural}/s",
severity=Severity.CRITICAL,
healthchecks_slug=slug,
)
elif warning_threshold and (IS_TESTING or current_stat_bytes_per_sec >= warning_threshold):
warning_threshold_natural = naturalsize(warning_threshold, binary=True)
@ -39,12 +46,14 @@ class NetIOTracker:
alert_type=AlertType.NET,
message=f"Interface {interface} {stat_name} {current_stat_natural}/s >= {warning_threshold_natural}/s",
severity=Severity.WARNING,
healthchecks_slug=slug,
)
else:
return Alert(
alert_type=AlertType.NET,
message=f"Interface {interface} {stat_name} {current_stat_natural}/s (nominal)",
severity=Severity.OK,
healthchecks_slug=slug,
)
def net_check(self) -> list[Alert]:

View file

@ -1,21 +1,27 @@
from socket import gethostname
from psutil import virtual_memory
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.core import cvars
from .utils import format_for_healthchecks_slug
IS_TESTING = False
def ram_check() -> list[Alert]:
percentage = virtual_memory().percent
config = cvars.config.get().checks.ram
slug = f"{format_for_healthchecks_slug(gethostname())}-ram"
if config.critical_percentage and (IS_TESTING or percentage >= config.critical_percentage):
return [
Alert(
alert_type=AlertType.RAM,
message=f"RAM usage: {percentage:.2f}% >= {config.critical_percentage:.2f}%",
severity=Severity.CRITICAL,
healthchecks_slug=slug,
)
]
elif config.warning_percentage and (IS_TESTING or percentage >= config.warning_percentage):
@ -24,9 +30,15 @@ def ram_check() -> list[Alert]:
alert_type=AlertType.RAM,
message=f"RAM usage: {percentage:.2f}% >= {config.warning_percentage:.2f}%",
severity=Severity.WARNING,
healthchecks_slug=slug,
)
]
else:
return [
Alert(alert_type=AlertType.RAM, message=f"RAM usage: {percentage:.2f}% (nominal)", severity=Severity.OK)
Alert(
alert_type=AlertType.RAM,
message=f"RAM usage: {percentage:.2f}% (nominal)",
severity=Severity.OK,
healthchecks_slug=slug,
)
]

View file

@ -1,6 +1,9 @@
from socket import gethostname
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from ..utils import format_for_healthchecks_slug
from . import sensors
IS_TESTING = False
@ -11,23 +14,29 @@ def temp_check() -> list[Alert]:
temps = sensors.get_readings()
for sensor, readings in temps.items():
for r in readings:
sensor_slug = format_for_healthchecks_slug(sensor)
label_slug = format_for_healthchecks_slug(r.label)
slug = f"{format_for_healthchecks_slug(gethostname())}-temp-{sensor_slug}-{label_slug}"
if r.critical_temp is not None and (IS_TESTING or r.current_temp >= r.critical_temp):
alert = Alert(
alert_type=AlertType.TEMP,
message=f"{sensor} {r.label}: {r.current_temp}°C >= {r.critical_temp}°C",
severity=Severity.CRITICAL,
healthchecks_slug=slug,
)
elif r.warning_temp is not None and (IS_TESTING or r.current_temp >= r.warning_temp):
alert = Alert(
alert_type=AlertType.TEMP,
message=f"{sensor} {r.label}: {r.current_temp}°C >= {r.warning_temp}°C",
severity=Severity.WARNING,
healthchecks_slug=slug,
)
else:
alert = Alert(
alert_type=AlertType.TEMP,
message=f"{sensor} {r.label}: {r.current_temp}°C (nominal)",
severity=Severity.OK,
healthchecks_slug=slug,
)
alert_list.append(alert)

View file

@ -0,0 +1,5 @@
import re
def format_for_healthchecks_slug(s: str) -> str:
return re.sub(r"[^a-z0-9_-]", "_", s.lower())

View file

@ -1,7 +1,10 @@
from socket import gethostname
from lego_monitoring.alerting.alert import Alert
from lego_monitoring.alerting.enum import AlertType, Severity
from lego_monitoring.alerting.sender import send_alert
from ..utils import format_for_healthchecks_slug
from .vulnix import get_vulnix_output
IS_TESTING = False
@ -9,6 +12,7 @@ IS_TESTING = False
async def vulnix_check() -> list[Alert]:
alert_list = []
slug = f"{format_for_healthchecks_slug(gethostname())}-vulnix"
try:
vulnix_output = get_vulnix_output(IS_TESTING)
except Exception as e:
@ -17,6 +21,7 @@ async def vulnix_check() -> list[Alert]:
alert_type=AlertType.ERROR,
message=f"Exception {type(e).__name__} while calling vulnix: {e}",
severity=Severity.CRITICAL,
healthchecks_slug=slug,
)
)
return []
@ -29,6 +34,7 @@ async def vulnix_check() -> list[Alert]:
continue
message = f"New findings in derivation <code>{finding.derivation}</code>:"
short_message = f"New findings in <code>{finding.derivation}</code> (short ver):"
plain_message = f"New findings in derivation {finding.derivation}:"
for cve in non_whitelisted_cves:
if cve in finding.cvssv3_basescore:
score_str = f"(CVSSv3 = {finding.cvssv3_basescore[cve]})"
@ -36,6 +42,7 @@ async def vulnix_check() -> list[Alert]:
score_str = "(not scored by CVSSv3)"
message += f'\n* <a href="https://nvd.nist.gov/vuln/detail/{cve}">{cve}</a> - {finding.description[cve]} {score_str}'
short_message += f'\n * <a href="https://nvd.nist.gov/vuln/detail/{cve}">{cve}</a>'
plain_message += f"\n* https://nvd.nist.gov/vuln/detail/{cve} - {finding.description[cve]} {score_str}"
if len(message) > 3700:
message = short_message
@ -44,6 +51,8 @@ async def vulnix_check() -> list[Alert]:
alert_type=AlertType.VULN,
message=message,
severity=Severity.WARNING,
healthchecks_slug=slug,
plain_message=plain_message,
)
alert_list.append(alert)
@ -51,6 +60,6 @@ async def vulnix_check() -> list[Alert]:
alert_list[0].message += "\n(just testing)"
return [alert_list[0]]
elif len(alert_list) == 0:
return [Alert(AlertType.VULN, message="No vulnerabilities found", severity=Severity.OK)]
return [Alert(AlertType.VULN, message="No vulnerabilities found", severity=Severity.OK, healthchecks_slug=slug)]
else:
return alert_list

View file

@ -7,7 +7,7 @@ from typing import Any, Callable, Coroutine
from ..alerting.alert import Alert
from ..alerting.current import CurrentAlerts
from ..alerting.enum import Severity
from ..alerting.sender import send_alert
from ..alerting.sender import send_alert, send_healthchecks_status
@dataclass
@ -75,6 +75,9 @@ class BaseChecker:
for alert in alerts:
await send_alert(alert, note="ongoing")
for alert in alerts:
await send_healthchecks_status(alert)
async def run_checker(self) -> None:
raise NotImplementedError