import argparse import asyncio import datetime import logging import signal from typing import Coroutine from . import checks from .alerting import sender from .alerting.commands import CommandHandlerManager from .checks.temp.sensors import print_readings from .config import enums as config_enums from .config import load_config from .core import cvars from .core.checkers import ( BaseChecker, IntervalChecker, PipeIntervalChecker, ScheduledChecker, ) from .core.const import UPS_PIPE_NAME stopping = False def stop_gracefully(signum, frame): global stopping stopping = True def main() -> None: asyncio.run(async_main()) async def async_main(): parser = argparse.ArgumentParser( prog="lego-monitoring", description="Lego-monitoring service", ) parser.add_argument("-c", "--config", help="config file") parser.add_argument("--print-temp", help="print temp sensor readings and exit", action="store_true") args = parser.parse_args() if args.config: config_path = parser.parse_args().config config = load_config(config_path) cvars.config.set(config) if args.print_temp: print_readings() raise SystemExit if not args.config: raise RuntimeError("--config must be specified in standard operating mode") logging.basicConfig(level=config.log_level) check_sets = config_enums.CheckSet checker_sets: dict[config_enums.CheckSet, list[Coroutine | BaseChecker]] = { check_sets.SELF: [ sender.send_alert(checks.generate_start_alert()), IntervalChecker(checks.self_check, interval=datetime.timedelta(minutes=5), persistent=False), ], check_sets.CPU: [ IntervalChecker( checks.cpu_check, interval=datetime.timedelta(minutes=3), persistent=True, ignore_first_run=True ) ], check_sets.RAM: [IntervalChecker(checks.ram_check, interval=datetime.timedelta(minutes=1), persistent=True)], check_sets.TEMP: [IntervalChecker(checks.temp_check, interval=datetime.timedelta(minutes=5), persistent=True)], check_sets.VULNIX: [ IntervalChecker( checks.vulnix_check, interval=datetime.timedelta(days=3), persistent=True, send_any_state=True, # As those are checked less often than daily, reminds could lead to awkward situations # when the vuln is fixed but you still get reminders about it for 2 more days. remind=False, ) ], check_sets.REMIND: [ ScheduledChecker( checks.remind_check, period=datetime.timedelta(days=1), when=datetime.time(hour=0, minute=0), persistent=False, is_reminder=True, ) ], check_sets.NET: [ IntervalChecker(checks.NetIOTracker().net_check, interval=datetime.timedelta(minutes=5), persistent=True) ], check_sets.UPS: [ PipeIntervalChecker( checks.UPSTracker().ups_check, interval=datetime.timedelta(minutes=5), persistent=True, pipe=UPS_PIPE_NAME, owner_group=config.checks.ups.upsmon_group, ) ], check_sets.LVMRAID: [ IntervalChecker(checks.lvmraid_check, interval=datetime.timedelta(minutes=5), persistent=True) ], } checkers = [] for enabled_set in config.enabled_check_sets: for checker in checker_sets[enabled_set]: checkers.append(checker) checker_sets[check_sets.REMIND][0].check_args = [checkers] if config.alert_channels.telegram is not None: tg_client = await sender.get_tg_client() my_username = (await tg_client.get_me()).username logging.info(f"Logged in as @{my_username}") command_manager = CommandHandlerManager(checkers) await command_manager.attach_handlers(tg_client) else: logging.info("Telegram integration is disabled") tg_client = None cvars.tg_client.set(tg_client) if config.alert_channels.healthchecks is not None: healthchecks_client = sender.get_healthchecks_client() logging.info("Ready to send pings to healthchecks") cvars.healthchecks_client.set(healthchecks_client) else: logging.info("Healthchecks integration is disabled") signal.signal(signal.SIGTERM, stop_gracefully) async with asyncio.TaskGroup() as tg: checker_tasks: set[asyncio.Task] = set() for c in checkers: if isinstance(c, BaseChecker): c = c.run_checker() task = tg.create_task(c) checker_tasks.add(task) while True: if stopping: if "self" in config.enabled_check_sets: alert = checks.generate_stop_alert() async with asyncio.TaskGroup() as tg: tg.create_task(sender.send_alert(alert)) tg.create_task(sender.send_healthchecks_status(alert)) for c in checkers: try: await c.graceful_stop() except AttributeError: continue if tg_client: await tg_client.disconnect() raise SystemExit else: await asyncio.sleep(3)