mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-09 20:31:10 +00:00
43 lines
1.9 KiB
Python
43 lines
1.9 KiB
Python
from typing import Any, Coroutine
|
|
|
|
from lego_monitoring.alerting.alert import Alert
|
|
from lego_monitoring.config.enums import CheckSet
|
|
from lego_monitoring.core.checkers import BaseChecker
|
|
|
|
|
|
def remind_check(checker_sets: dict[CheckSet, list[Coroutine | BaseChecker]]) -> list[Alert]:
|
|
alerts = []
|
|
for checker_set in checker_sets.values():
|
|
for c in checker_set:
|
|
if not isinstance(c, BaseChecker) or not c.persistent or not c.remind:
|
|
continue
|
|
alerts.extend(c.current_alerts)
|
|
return alerts
|
|
|
|
# alert_num_by_state_with_max_type: dict[AlertType, list[Severity | int]] = {}
|
|
# for checker_set in checker_sets.values():
|
|
# for c in checker_set:
|
|
# if not isinstance(c, BaseChecker) or not c.persistent:
|
|
# continue
|
|
# for a in c.current_alerts:
|
|
# if a.alert_type not in alert_num_by_state_with_max_type:
|
|
# alert_num_by_state_with_max_type[a.alert_type] = [a.severity, 1]
|
|
# else:
|
|
# existing_list = alert_num_by_state_with_max_type[a.alert_type]
|
|
# if a.severity > existing_list[0]:
|
|
# existing_list[0] = a.severity
|
|
# existing_list[1] += 1
|
|
|
|
# if len(alert_num_by_state_with_max_type) == 0:
|
|
# return []
|
|
|
|
# message = "There are ongoing events:"
|
|
# for at, sev_count in alert_num_by_state_with_max_type.items():
|
|
# message += f"\n* {SEVERITY_TO_EMOJI[sev_count[0]]} {str(at)} - {sev_count[1]} alerts"
|
|
# message += (
|
|
# "\n\nUse /ongoing to see them or /status to see this short reminder again (NOT IMPLEMENTED YET)."
|
|
# + "\nYou will also be reminded daily until the situation is resolved."
|
|
# )
|
|
|
|
# alert = Alert(alert_type=AlertType.REMIND, message=message, severity=max(alert_num_by_state_with_max_type.keys()))
|
|
# return [alert]
|