import asyncio import json import logging import os import signal import tempfile from dataclasses import asdict from alerting.alerts import Alert, send_alert from misc.common import TMP_DIR handler_lock = asyncio.Lock() async def handler(): """ Is executed when SIGUSR1 is received. Checks for new alerts, sends them in 10 seconds """ alert_counts: list[list] = [] await asyncio.sleep(10) async with handler_lock: alert_files = [f for f in os.listdir(str(TMP_DIR)) if f.startswith("alert-")] for fname in alert_files: fpath = str(TMP_DIR / fname) with open(fpath) as f: alert = Alert(**json.load(f)) for pair in alert_counts: if alert == pair[0]: pair[1] += 1 break else: alert_counts.append([alert, 1]) os.unlink(fpath) for pair in alert_counts: if pair[1] > 1: pair[0].message += f" ({pair[1]} times)" pair[0].html_message += f" ({pair[1]} times)" logging.info(f"Sending delayed alert of type {pair[0].alert_type}") await send_alert(pair[0]) def send_alert_delayed(alert: Alert) -> None: """ Put alert into a temporary file to be sent later. Intended as an IPC mechanism """ alert_json = json.dumps(asdict(alert)) with tempfile.NamedTemporaryFile(prefix="alert-", dir=str(TMP_DIR), delete=False) as af: af.write(alert_json.encode()) with open(str(TMP_DIR / "pid")) as pf: pid = int(pf.read().strip()) os.kill(pid, signal.SIGUSR1)