lego-monitoring/misc/docker_registry.py
2024-10-27 22:29:32 +03:00

127 lines
5.3 KiB
Python

import datetime
import ipaddress
import json
import logging
import re
import socket
import traceback
from typing import Optional
from urllib.parse import urlparse
import uplink
from alerting import alerts
from misc import cvars
class DockerHubClient(uplink.Consumer):
@uplink.returns.json
@uplink.get("v2/namespaces/{namespace}/repositories/{repository}/tags/latest")
def get_latest_tag(self, namespace: uplink.Path, repository: uplink.Path): ...
class DockerRegistryAuthorizer(uplink.Consumer):
@uplink.returns.json
@uplink.get()
def _get_token_unprotected(self, service: uplink.Query, scope: uplink.Query): ...
async def get_token(self, service: Optional[str], scope: Optional[str]) -> str:
host = urlparse(self.session.base_url).hostname
ips = set()
try:
ips.add(ipaddress.ip_address(host))
except:
addrinfo = socket.getaddrinfo(host, None)
for t in addrinfo:
ips.add(ipaddress.ip_address(t[4][0]))
for ip in ips:
if not ip.is_global:
raise Exception(f"{host} resolved to {ip} which is not global")
return (await self._get_token_unprotected(service, scope))["token"]
class DockerRegistryClient(uplink.Consumer):
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
def _test_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
@uplink.returns.json
@uplink.get("v2/{namespace}/{repository}/manifests/latest")
def _get_manifest(self, namespace: uplink.Path, repository: uplink.Path): ...
@uplink.get("v2/{namespace}/{repository}/blobs/{digest}")
def _get_blob(self, namespace: uplink.Path, repository: uplink.Path, digest: uplink.Path): ...
async def get_auth_requirements(self, namespace: str, repository: str) -> Optional[tuple[str, str, str]]:
response = await self._test_manifest(namespace, repository)
if 200 <= response.status_code < 300:
return None
auth_regex = re.compile(r"([^\s,]+) ?[=] ?\"?([^\s,\"]+)\"?")
auth_keys = dict(auth_regex.findall(response.headers["Www-Authenticate"]))
return (auth_keys["realm"], auth_keys.get("service", None), auth_keys.get("scope", None))
async def get_updated_datetime_iso(self, namespace: str, repository: str) -> str:
manifest = await self._get_manifest(namespace, repository)
config_digest = manifest["config"]["digest"]
blob = json.loads(await (await self._get_blob(namespace, repository, digest=config_digest)).content.read())
return blob["created"]
async def get_updated_images() -> list[str]:
check_config = cvars.config.get()["checks"]["docker_registry"]
hub_client = DockerHubClient(base_url=check_config["hub_url"], client=uplink.AiohttpClient())
now = datetime.datetime.now(datetime.timezone.utc)
updated_images = []
for image in check_config["images"]:
image_split = image.split("/")
match len(image_split):
case 2:
namespace, repository = image_split
try:
last_updated_iso = (await hub_client.get_latest_tag(namespace=namespace, repository=repository))[
"tag_last_pushed"
]
except Exception as exc:
await alerts.send_alert(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message=f"Could not query Docker Hub: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
return []
case 3:
registry, namespace, repository = image_split
registry_client = DockerRegistryClient(base_url=f"https://{registry}/", client=uplink.AiohttpClient())
try:
requirements = await registry_client.get_auth_requirements(namespace, repository)
if requirements is not None:
registry_authorizer = DockerRegistryAuthorizer(
base_url=requirements[0], client=uplink.AiohttpClient()
)
token = await registry_authorizer.get_token(requirements[1], requirements[2])
registry_client.session.headers["Authorization"] = f"Bearer {token}"
last_updated_iso = await registry_client.get_updated_datetime_iso(
namespace=namespace, repository=repository
)
except Exception as exc:
await alerts.send_alert(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message=f"Could not query Docker registry {registry}: {repr(exc)}, see logs",
severity=alerts.Severity.CRITICAL,
)
)
logging.error(traceback.format_exc())
return []
case _:
raise Exception(f"Invalid image spec: {image}")
last_updated = datetime.datetime.fromisoformat(last_updated_iso)
if now - last_updated <= datetime.timedelta(days=1):
updated_images.append(image)
return updated_images