Merge branch 'docker-registry' into 'main'

Docker Registry monitoring

See merge request lego/lego-monitoring!5
This commit is contained in:
Alex Tau 2024-10-27 20:13:43 +00:00
commit df9647708d
9 changed files with 221 additions and 32 deletions

View file

@ -9,9 +9,9 @@ DISCLAIMER: This repository does not have anything to do with the LEGO Group. "l
## Configuring
* Run `alerting/login.py` once to generate credentials file
* Invite the bot account to the room (you have to accept the invite manually)
* Set room ID in `alerting/common.py`
* Copy `config.example.json` to `config.json`, edit as necessary
* Run `alerting/login.py` once to login into Matrix
## Running

View file

@ -21,6 +21,7 @@ class AlertType(StrEnum):
SMART = "SMART" # TODO
RAID = "RAID" # TODO
UPS = "UPS"
UPDATE = "UPDATE"
class Severity(Enum):

View file

@ -1,4 +1,4 @@
import os
from pathlib import Path
CONFIG_FILE = (Path(os.path.dirname(os.path.realpath(__file__))) / "config.json").resolve()
CONFIG_FILE = (Path(os.path.dirname(os.path.realpath(__file__))) / ".." / "config.json").resolve()

10
config.example.json Normal file
View file

@ -0,0 +1,10 @@
{
"checks": {
"docker_registry": {
"hub_url": "https://hub.docker.com/",
"images": [
"gitlab/gitlab-ce"
]
}
}
}

53
misc/checkers.py Normal file
View file

@ -0,0 +1,53 @@
import asyncio
import datetime
import logging
from typing import Callable, Coroutine
from alerting import alerts
async def _call_check(check: Callable | Coroutine, *args, **kwargs) -> list[alerts.Alert]:
if isinstance(check, Callable):
result = check(*args, **kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(check, Coroutine):
result = await check
else:
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
return result
async def interval_checker(check: Callable | Coroutine, interval: datetime.timedelta, *args, **kwargs):
interval_secs = interval.total_seconds()
while True:
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
await asyncio.sleep(interval_secs)
async def scheduled_checker(
check: Callable | Coroutine, period: datetime.timedelta, when: datetime.time, *args, **kwargs
):
match period:
case datetime.timedelta(days=1):
while True:
now = datetime.datetime.now()
next_datetime = datetime.datetime.combine(datetime.date.today(), when)
if next_datetime < now:
next_datetime += datetime.timedelta(days=1)
logging.info(f"Scheduled to call {check.__name__} at {next_datetime.isoformat()}")
await asyncio.sleep(
(next_datetime - now).total_seconds()
) # might be negative at this point, asyncio doesn't care
logging.info(f"Calling {check.__name__}")
result = await _call_check(check, *args, **kwargs)
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
case _:
raise NotImplementedError

View file

@ -1,7 +1,7 @@
from datetime import timedelta
from alerting import alerts
from misc import sensors, vuln
from misc import docker_registry, sensors, vuln
from misc.enums import UPSStatus
IS_TESTING = False
@ -155,3 +155,17 @@ async def ups_check() -> list[alerts.Alert]:
)
return alert_list
async def docker_registry_check() -> list[alerts.Alert]:
updated_images = await docker_registry.get_updated_images()
alert_list = []
for image in updated_images:
alert_list.append(
alerts.Alert(
alert_type=alerts.AlertType.UPDATE,
message=f"{image} docker image: new version available",
severity=alerts.Severity.INFO,
)
)
return alert_list

127
misc/docker_registry.py Normal file
View file

@ -0,0 +1,127 @@
import datetime
import ipaddress
import json
import logging
import re
import socket
import traceback
from typing import Optional
from urllib.parse import urlparse
import uplink
from alerting import alerts
from misc import cvars
class DockerHubClient(uplink.Consumer):
@uplink.returns.json
@uplink.get("v2/namespaces/{namespace}/repositories/{repository}/tags/latest")
def get_latest_tag(self, namespace: uplink.Path, repository: uplink.Path): ...
class DockerRegistryAuthorizer(uplink.Consumer):
@uplink.returns.json
@uplink.get()
def _get_token_unprotected(self, service: uplink.Query, scope: uplink.Query): ...
async def get_token(self, service: Optional[str], scope: Optional[str]) -> str:
host = urlparse(self.session.base_url).hostname
ips = set()
try:
ips.add(ipaddress.ip_address(host))
except:
addrinfo = socket.getaddrinfo(host, None)
for t in addrinfo:
ips.add(ipaddress.ip_address(t[4][0]))
for ip in ips:
if not ip.is_global:
raise Exception(f"{host} resolved to {ip} which is not global")
return (await self._get_token_unprotected(service, scope))["token"]
class DockerRegistryClient(uplink.Consumer):
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
def _test_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
@uplink.returns.json
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
def _get_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
@uplink.get("v2/{namespace}/{repository}/blobs/{digest}")
def _get_blob(self, namespace: uplink.Path, repository: uplink.Path, digest: uplink.Path): ...
async def get_auth_requirements(self, namespace: str, repository: str) -> Optional[tuple[str, str, str]]:
response = await self._test_manifest(namespace, repository)
if 200 <= response.status_code < 300:
return None
auth_regex = re.compile(r"([^\s,]+) ?[=] ?\"?([^\s,\"]+)\"?")
auth_keys = dict(auth_regex.findall(response.headers["Www-Authenticate"]))
return (auth_keys["realm"], auth_keys.get("service", None), auth_keys.get("scope", None))
async def get_updated_datetime_iso(self, namespace: str, repository: str) -> str:
manifest = await self._get_manifest(namespace, repository)
config_digest = manifest["config"]["digest"]
blob = json.loads(await (await self._get_blob(namespace, repository, digest=config_digest)).content.read())
return blob["created"]
async def get_updated_images() -> list[str]:
check_config = cvars.config.get()["checks"]["docker_registry"]
hub_client = DockerHubClient(base_url=check_config["hub_url"], client=uplink.AiohttpClient())
now = datetime.datetime.now(datetime.timezone.utc)
updated_images = []
for image in check_config["images"]:
image_split = image.split("/")
match len(image_split):
case 2:
namespace, repository = image_split
try:
last_updated_iso = (await hub_client.get_latest_tag(namespace=namespace, repository=repository))[
"tag_last_pushed"
]
except Exception as exc:
await alerts.send_alert(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message=f"Could not query Docker Hub: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
return []
case 3:
registry, namespace, repository = image_split
registry_client = DockerRegistryClient(base_url=f"https://{registry}/", client=uplink.AiohttpClient())
try:
requirements = await registry_client.get_auth_requirements(namespace, repository)
if requirements is not None:
registry_authorizer = DockerRegistryAuthorizer(
base_url=requirements[0], client=uplink.AiohttpClient()
)
token = await registry_authorizer.get_token(requirements[1], requirements[2])
registry_client.session.headers["Authorization"] = f"Bearer {token}"
last_updated_iso = await registry_client.get_updated_datetime_iso(
namespace=namespace, repository=repository
)
except Exception as exc:
await alerts.send_alert(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message=f"Could not query Docker registry {registry}: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
return []
case _:
raise Exception(f"Invalid image spec: {image}")
last_updated = datetime.datetime.fromisoformat(last_updated_iso)
if now - last_updated <= datetime.timedelta(days=1):
updated_images.append(image)
return updated_images

View file

@ -1,3 +1,5 @@
colorama==0.4.6
psutil==5.9.8
matrix-nio[e2e]==0.24.0
uplink[aiohttp]==0.9.7
setuptools==75.2.0

View file

@ -1,15 +1,16 @@
#!/usr/bin/env python3
import asyncio
import datetime
import json
import logging
import signal
from typing import Callable, Coroutine
import aiofiles
from alerting import alerts
from alerting.common import CONFIG_FILE
from misc import checks, cvars
from misc.checkers import interval_checker, scheduled_checker
logging.basicConfig(level=logging.INFO)
@ -21,29 +22,7 @@ def stop_gracefully(signum, frame):
stopping = True
async def checker(check: Callable | Coroutine, interval_secs: int, *args, **kwargs):
while True:
logging.info(f"Calling {check.__name__}")
if isinstance(check, Callable):
result = check(*args, **kwargs)
if isinstance(result, Coroutine):
result = await result
elif isinstance(check, Coroutine):
result = await check
else:
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
logging.info(f"Got {len(result)} alerts")
for alert in result:
await alerts.send_alert(alert)
await asyncio.sleep(interval_secs)
async def main():
MINUTE = 60
HOUR = 60 * MINUTE
DAY = 24 * HOUR
WEEK = 7 * DAY
signal.signal(signal.SIGTERM, stop_gracefully)
async with aiofiles.open(CONFIG_FILE) as f:
@ -53,11 +32,14 @@ async def main():
client = await alerts.get_client()
cvars.matrix_client.set(client)
checkers = (
checker(checks.temp_check, 5 * MINUTE),
checker(checks.cpu_check, 5 * MINUTE),
checker(checks.ups_check, 5 * MINUTE),
checker(checks.ram_check, 1 * MINUTE),
checker(checks.vuln_check, 1 * DAY),
interval_checker(checks.temp_check, datetime.timedelta(minutes=5)),
interval_checker(checks.cpu_check, datetime.timedelta(minutes=5)),
interval_checker(checks.ups_check, datetime.timedelta(minutes=5)),
interval_checker(checks.ram_check, datetime.timedelta(minutes=1)),
interval_checker(checks.vuln_check, datetime.timedelta(days=1)),
scheduled_checker(
checks.docker_registry_check, period=datetime.timedelta(days=1), when=datetime.time(hour=0, minute=0)
),
)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()