some actual alerts and telegram client

This commit is contained in:
Alex Tau 2025-04-30 00:16:27 +03:00
parent f158bc3778
commit ffdd0429b3
10 changed files with 314 additions and 33 deletions

View file

@ -1,19 +1,65 @@
import argparse
import asyncio
import logging
import signal
import time
from .alerting import alerts
from .core import cvars
from .core.config import load_config
stopping = False
def stop_gracefully(signum, frame):
global stopping
stopping = True
def main() -> None:
logging.basicConfig(level=logging.INFO)
asyncio.run(async_main())
async def async_main():
parser = argparse.ArgumentParser(
prog="lego-monitoring",
description="Lego-monitoring service",
)
parser.add_argument('-c', '--config', required=True)
parser.add_argument("-c", "--config", required=True)
config_path = parser.parse_args().config
config = load_config(config_path)
cvars.config.set(config)
while True:
print(f"service running... opt 1 is {config.non_secret_config_option}, opt 2 is secret, but if you really wanna know, it's {config.config_option}", flush=True)
time.sleep(300)
tg_client = await alerts.get_client()
cvars.tg_client.set(tg_client)
checker_sets = {
"start": [
alerts.send_start_alert(),
],
"stop": [], # this is checked later
}
checkers = []
for enabled_set in config.enabled_checker_sets:
for checker in checker_sets[enabled_set]:
checkers.append(checker)
signal.signal(signal.SIGTERM, stop_gracefully)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
for c in checkers:
task = tg.create_task(c)
checker_tasks.add(task)
while True:
if stopping:
if "stop" in config.enabled_checker_sets:
await alerts.send_stop_alert()
await tg_client.disconnect()
raise SystemExit
else:
await asyncio.sleep(3)

View file

@ -0,0 +1,73 @@
from dataclasses import dataclass
from telethon import TelegramClient
from telethon.sessions import MemorySession
from ..core import cvars
from .enum import AlertType, Severity
@dataclass
class Alert:
alert_type: AlertType
message: str
severity: Severity
async def get_client() -> TelegramClient:
config = cvars.config.get()
api_id, api_hash, bot_token = config.telegram.creds.split(",")
client = await TelegramClient(MemorySession(), api_id, api_hash).start(bot_token=bot_token)
client.parse_mode = "html"
return client
def format_message(alert: Alert) -> str:
match alert.severity:
case Severity.INFO:
severity_emoji = ""
case Severity.WARNING:
severity_emoji = "⚠️"
case Severity.CRITICAL:
severity_emoji = "🆘"
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
return message
async def send_alert(alert: Alert) -> None:
try:
client = cvars.tg_client.get()
except LookupError: # being called standalone
# cvars.config.set(get_config())
# temp_client = True
# client = await get_client()
# cvars.matrix_client.set(client)
raise NotImplementedError # TODO
else:
... # temp_client = False
room_id = cvars.config.get().telegram.room_id
message = format_message(alert)
await client.send_message(entity=room_id, message=message)
# if temp_client:
# await client.close()
async def send_start_alert() -> None:
config = cvars.config.get()
await send_alert(
Alert(
alert_type=AlertType.BOOT,
message=f"Service running with enabled checkers: {', '.join(config.enabled_checker_sets)}",
severity=Severity.INFO,
)
)
async def send_stop_alert() -> None:
await send_alert(
Alert(
alert_type=AlertType.BOOT,
message="Service stopping.",
severity=Severity.INFO,
)
)

View file

@ -0,0 +1,23 @@
from enum import StrEnum
class AlertType(StrEnum):
BOOT = "BOOT"
TEST = "TEST"
# ERROR = "ERROR"
# RAM = "RAM"
# CPU = "CPU"
# TEMP = "TEMP"
# VULN = "VULN"
# LOGIN = "LOGIN"
# SMART = "SMART" # TODO
# RAID = "RAID"
# DISKS = "DISKS"
# UPS = "UPS"
# UPDATE = "UPDATE"
class Severity(StrEnum):
INFO = "INFO"
WARNING = "WARNING"
CRITICAL = "CRITICAL"

View file

@ -0,0 +1,53 @@
import asyncio
import datetime
import logging
from typing import Callable, Coroutine
from ..alerting import alerts
async def _call_check(check: Callable | Coroutine, *args, **kwargs) -> list[alerts.Alert]:
if isinstance(check, Callable):
result = check(*args, **kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(check, Coroutine):
result = await check
else:
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
return result
async def interval_checker(check: Callable | Coroutine, interval: datetime.timedelta, *args, **kwargs):
interval_secs = interval.total_seconds()
while True:
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
await asyncio.sleep(interval_secs)
async def scheduled_checker(
check: Callable | Coroutine, period: datetime.timedelta, when: datetime.time, *args, **kwargs
):
match period:
case datetime.timedelta(days=1):
while True:
now = datetime.datetime.now()
next_datetime = datetime.datetime.combine(datetime.date.today(), when)
if next_datetime < now:
next_datetime += datetime.timedelta(days=1)
logging.info(f"Scheduled to call {check.__name__} at {next_datetime.isoformat()}")
await asyncio.sleep(
(next_datetime - now).total_seconds()
) # might be negative at this point, asyncio doesn't care
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
case _:
raise NotImplementedError

View file

@ -1,32 +1,43 @@
import json
from dataclasses import dataclass
from typing import Optional
from alt_utils import NestedDeserializableDataclass
@dataclass
class TelegramConfig:
creds: str
room_id: int
@dataclass
class Config(NestedDeserializableDataclass):
non_secret_config_option: str
config_option: Optional[str]
enabled_checker_sets: list[str]
telegram: TelegramConfig
def load_config(filepath: str) -> Config:
def load_secrets(d: dict) -> dict:
new_d = {}
for k in d:
if k.endswith("_secret_path"):
actual_opt_key = k[:-12]
secret_path = d[k]
if secret_path is None:
new_d[actual_opt_key] = None
else:
with open(secret_path) as sf:
new_d[actual_opt_key] = sf.read().rstrip()
elif type(d[k]) == dict:
new_d[k] = load_secrets(d[k])
else:
new_d[k] = d[k]
return new_d
with open(filepath) as f:
cfg_dict = json.load(f)
# load secrets from paths
new_cfg_dict = {}
for k in cfg_dict:
if k.endswith('_secret_path'):
actual_opt_key = k[:-12]
secret_path = cfg_dict[k]
if secret_path is None:
new_cfg_dict[actual_opt_key] = None
else:
with open(secret_path) as sf:
new_cfg_dict[actual_opt_key] = sf.read().rstrip()
else:
new_cfg_dict[k] = cfg_dict[k]
new_cfg_dict = load_secrets(cfg_dict)
cfg = Config.from_dict(new_cfg_dict)
return cfg

View file

@ -0,0 +1,8 @@
from contextvars import ContextVar
from telethon import TelegramClient
from .config import Config
config: ContextVar[Config] = ContextVar("config")
tg_client: ContextVar[TelegramClient] = ContextVar("tg_client")