mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-12 05:35:19 +00:00
add docker registry monitoring
This commit is contained in:
parent
618ca3c9aa
commit
13723a6bb4
9 changed files with 221 additions and 32 deletions
53
misc/checkers.py
Normal file
53
misc/checkers.py
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Callable, Coroutine
|
||||
|
||||
from alerting import alerts
|
||||
|
||||
|
||||
async def _call_check(check: Callable | Coroutine, *args, **kwargs) -> list[alerts.Alert]:
|
||||
if isinstance(check, Callable):
|
||||
result = check(*args, **kwargs)
|
||||
if isinstance(result, Coroutine):
|
||||
result = await result
|
||||
elif isinstance(check, Coroutine):
|
||||
result = await check
|
||||
else:
|
||||
raise TypeError(f"check is {type(check)}, neither function nor coroutine")
|
||||
return result
|
||||
|
||||
|
||||
async def interval_checker(check: Callable | Coroutine, interval: datetime.timedelta, *args, **kwargs):
|
||||
interval_secs = interval.total_seconds()
|
||||
while True:
|
||||
logging.info(f"Calling {check.__name__}")
|
||||
result = await _call_check(check, *args, **kwargs)
|
||||
logging.info(f"Got {len(result)} alerts")
|
||||
for alert in result:
|
||||
await alerts.send_alert(alert)
|
||||
await asyncio.sleep(interval_secs)
|
||||
|
||||
|
||||
async def scheduled_checker(
|
||||
check: Callable | Coroutine, period: datetime.timedelta, when: datetime.time, *args, **kwargs
|
||||
):
|
||||
match period:
|
||||
case datetime.timedelta(days=1):
|
||||
while True:
|
||||
now = datetime.datetime.now()
|
||||
next_datetime = datetime.datetime.combine(datetime.date.today(), when)
|
||||
if next_datetime < now:
|
||||
next_datetime += datetime.timedelta(days=1)
|
||||
logging.info(f"Scheduled to call {check.__name__} at {next_datetime.isoformat()}")
|
||||
await asyncio.sleep(
|
||||
(next_datetime - now).total_seconds()
|
||||
) # might be negative at this point, asyncio doesn't care
|
||||
|
||||
logging.info(f"Calling {check.__name__}")
|
||||
result = await _call_check(check, *args, **kwargs)
|
||||
logging.info(f"Got {len(result)} alerts")
|
||||
for alert in result:
|
||||
await alerts.send_alert(alert)
|
||||
case _:
|
||||
raise NotImplementedError
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from datetime import timedelta
|
||||
|
||||
from alerting import alerts
|
||||
from misc import sensors, vuln
|
||||
from misc import docker_registry, sensors, vuln
|
||||
from misc.enums import UPSStatus
|
||||
|
||||
IS_TESTING = False
|
||||
|
|
@ -155,3 +155,17 @@ async def ups_check() -> list[alerts.Alert]:
|
|||
)
|
||||
|
||||
return alert_list
|
||||
|
||||
|
||||
async def docker_registry_check() -> list[alerts.Alert]:
|
||||
updated_images = await docker_registry.get_updated_images()
|
||||
alert_list = []
|
||||
for image in updated_images:
|
||||
alert_list.append(
|
||||
alerts.Alert(
|
||||
alert_type=alerts.AlertType.UPDATE,
|
||||
message=f"{image} docker image has been updated",
|
||||
severity=alerts.Severity.INFO,
|
||||
)
|
||||
)
|
||||
return alert_list
|
||||
|
|
|
|||
127
misc/docker_registry.py
Normal file
127
misc/docker_registry.py
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
import datetime
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
import traceback
|
||||
from typing import Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import uplink
|
||||
|
||||
from alerting import alerts
|
||||
from misc import cvars
|
||||
|
||||
|
||||
class DockerHubClient(uplink.Consumer):
|
||||
@uplink.returns.json
|
||||
@uplink.get("v2/namespaces/{namespace}/repositories/{repository}/tags/latest")
|
||||
def get_latest_tag(self, namespace: uplink.Path, repository: uplink.Path): ...
|
||||
|
||||
|
||||
class DockerRegistryAuthorizer(uplink.Consumer):
|
||||
@uplink.returns.json
|
||||
@uplink.get()
|
||||
def _get_token_unprotected(self, service: uplink.Query, scope: uplink.Query): ...
|
||||
|
||||
async def get_token(self, service: Optional[str], scope: Optional[str]) -> str:
|
||||
host = urlparse(self.session.base_url).hostname
|
||||
ips = set()
|
||||
try:
|
||||
ips.add(ipaddress.ip_address(host))
|
||||
except:
|
||||
addrinfo = socket.getaddrinfo(host, None)
|
||||
for t in addrinfo:
|
||||
ips.add(ipaddress.ip_address(t[4][0]))
|
||||
for ip in ips:
|
||||
if not ip.is_global:
|
||||
raise Exception(f"{host} resolved to {ip} which is not global")
|
||||
return (await self._get_token_unprotected(service, scope))["token"]
|
||||
|
||||
|
||||
class DockerRegistryClient(uplink.Consumer):
|
||||
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
|
||||
def _test_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
|
||||
|
||||
@uplink.returns.json
|
||||
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
|
||||
def _get_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
|
||||
|
||||
@uplink.get("v2/{namespace}/{repository}/blobs/{digest}")
|
||||
def _get_blob(self, namespace: uplink.Path, repository: uplink.Path, digest: uplink.Path): ...
|
||||
|
||||
async def get_auth_requirements(self, namespace: str, repository: str) -> Optional[tuple[str, str, str]]:
|
||||
response = await self._test_manifest(namespace, repository)
|
||||
if 200 <= response.status_code < 300:
|
||||
return None
|
||||
auth_regex = re.compile(r"([^\s,]+) ?[=] ?\"?([^\s,\"]+)\"?")
|
||||
auth_keys = dict(auth_regex.findall(response.headers["Www-Authenticate"]))
|
||||
return (auth_keys["realm"], auth_keys.get("service", None), auth_keys.get("scope", None))
|
||||
|
||||
async def get_updated_datetime_iso(self, namespace: str, repository: str) -> str:
|
||||
manifest = await self._get_manifest(namespace, repository)
|
||||
config_digest = manifest["config"]["digest"]
|
||||
blob = json.loads(await (await self._get_blob(namespace, repository, digest=config_digest)).content.read())
|
||||
return blob["created"]
|
||||
|
||||
|
||||
async def get_updated_images() -> list[str]:
|
||||
check_config = cvars.config.get()["checks"]["docker_registry"]
|
||||
hub_client = DockerHubClient(base_url=check_config["hub_url"], client=uplink.AiohttpClient())
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
|
||||
updated_images = []
|
||||
for image in check_config["images"]:
|
||||
image_split = image.split("/")
|
||||
match len(image_split):
|
||||
case 2:
|
||||
namespace, repository = image_split
|
||||
try:
|
||||
last_updated_iso = (await hub_client.get_latest_tag(namespace=namespace, repository=repository))[
|
||||
"tag_last_pushed"
|
||||
]
|
||||
except Exception as exc:
|
||||
await alerts.send_alert(
|
||||
alerts.Alert(
|
||||
alert_type=alerts.AlertType.ERROR,
|
||||
message=f"Could not query Docker Hub: {repr(exc)}, see logs",
|
||||
severity=alerts.Severity.CRITICAL,
|
||||
)
|
||||
)
|
||||
logging.error(traceback.format_exc())
|
||||
return []
|
||||
|
||||
case 3:
|
||||
registry, namespace, repository = image_split
|
||||
registry_client = DockerRegistryClient(base_url=f"https://{registry}/", client=uplink.AiohttpClient())
|
||||
try:
|
||||
requirements = await registry_client.get_auth_requirements(namespace, repository)
|
||||
|
||||
if requirements is not None:
|
||||
registry_authorizer = DockerRegistryAuthorizer(
|
||||
base_url=requirements[0], client=uplink.AiohttpClient()
|
||||
)
|
||||
token = await registry_authorizer.get_token(requirements[1], requirements[2])
|
||||
registry_client.session.headers["Authorization"] = f"Bearer {token}"
|
||||
|
||||
last_updated_iso = await registry_client.get_updated_datetime_iso(
|
||||
namespace=namespace, repository=repository
|
||||
)
|
||||
except Exception as exc:
|
||||
await alerts.send_alert(
|
||||
alerts.Alert(
|
||||
alert_type=alerts.AlertType.ERROR,
|
||||
message=f"Could not query Docker registry {registry}: {repr(exc)}, see logs",
|
||||
severity=alerts.Severity.CRITICAL,
|
||||
)
|
||||
)
|
||||
logging.error(traceback.format_exc())
|
||||
return []
|
||||
case _:
|
||||
raise Exception(f"Invalid image spec: {image}")
|
||||
last_updated = datetime.datetime.fromisoformat(last_updated_iso)
|
||||
if now - last_updated <= datetime.timedelta(days=1):
|
||||
updated_images.append(image)
|
||||
|
||||
return updated_images
|
||||
Loading…
Add table
Add a link
Reference in a new issue