From 436855d8c1b957b8dd08be5f4270a46309bd8f02 Mon Sep 17 00:00:00 2001 From: Alex Tau Date: Fri, 9 May 2025 15:27:22 +0300 Subject: [PATCH] vulnix integration --- flake.nix | 9 ++++ modules/default.nix | 32 +++++++++++++ modules/submodules/vulnixWhitelistRule.nix | 27 +++++++++++ src/lego_monitoring/__init__.py | 1 + src/lego_monitoring/alerting/enum.py | 4 +- src/lego_monitoring/checks/__init__.py | 1 + src/lego_monitoring/checks/vulnix/__init__.py | 48 +++++++++++++++++++ src/lego_monitoring/checks/vulnix/vulnix.py | 41 ++++++++++++++++ src/lego_monitoring/config/__init__.py | 2 + src/lego_monitoring/config/checks/vulnix.py | 8 ++++ src/lego_monitoring/core/const.py | 1 + 11 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 modules/submodules/vulnixWhitelistRule.nix create mode 100644 src/lego_monitoring/checks/vulnix/__init__.py create mode 100644 src/lego_monitoring/checks/vulnix/vulnix.py create mode 100644 src/lego_monitoring/config/checks/vulnix.py create mode 100644 src/lego_monitoring/core/const.py diff --git a/flake.nix b/flake.nix index a445160..1d9ea40 100644 --- a/flake.nix +++ b/flake.nix @@ -65,6 +65,15 @@ buildInputs = old.buildInputs or [ ] ++ [ _prev.setuptools ]; } ); + + lego-monitoring = _prev.lego-monitoring.overrideAttrs ( + old: { + postPatch = '' + substituteInPlace src/lego_monitoring/core/const.py \ + --replace-fail 'VULNIX_PATH: str = ...' 'VULNIX_PATH = "${lib.getExe pkgs.vulnix}"' + ''; + } + ); }; # This example is only using x86_64-linux diff --git a/modules/default.nix b/modules/default.nix index d7f3ab3..5c34d7f 100644 --- a/modules/default.nix +++ b/modules/default.nix @@ -9,6 +9,7 @@ package: let tempSensorOptions = (import ./submodules/tempSensorOptions.nix) { inherit lib; }; + vulnixWhitelistRule = (import ./submodules/vulnixWhitelistRule.nix) { inherit lib; }; in { options.services.lego-monitoring = { @@ -19,6 +20,7 @@ in "start" "stop" "temp" + "vulnix" ]); default = [ ]; description = "List of enabled check sets. Each check set is a module which checks something and generates alerts based on check results."; @@ -63,12 +65,40 @@ in ''; }; }; + + vulnix = { + whitelist = lib.mkOption { + type = lib.types.attrsOf (lib.types.submodule vulnixWhitelistRule); + default = { }; + description = "Whitelist rules for vulnix. Attr name is package with version, package name, or `*`."; + example = lib.literalExpression ''{ + "ffmpeg-3.4.2" = { + cve = [ "CVE-2018-6912" "CVE-2018-7557" ]; + until = "2018-05-01"; + issueUrl = "https://issues.example.com/29952"; + }; + }''; + }; + }; }; }; config = let cfg = config.services.lego-monitoring; json = pkgs.formats.json {}; + toml = pkgs.formats.toml {}; + + # This monstrous incantation has the effect of converting the options to snake_case + # and removing those that are null (because TOML does not support null values) + vulnixWhitelistFile = toml.generate "vulnix-whitelist.toml" (lib.attrsets.filterAttrsRecursive ( + k: v: v != null + ) ( + lib.mapAttrs (_: rule: { + inherit (rule) cve until; + issue_url = rule.issueUrl; + }) cfg.checks.vulnix.whitelist + )); + serviceConfigFile = json.generate "config.json" { enabled_check_sets = cfg.enabledCheckSets; telegram = with cfg.telegram; { @@ -88,6 +118,8 @@ in }) sensorCfg.readings; }) cfg.checks.temp.sensors; + + vulnix.whitelist_path = vulnixWhitelistFile; }; }; in lib.mkIf cfg.enable { diff --git a/modules/submodules/vulnixWhitelistRule.nix b/modules/submodules/vulnixWhitelistRule.nix new file mode 100644 index 0000000..9d14045 --- /dev/null +++ b/modules/submodules/vulnixWhitelistRule.nix @@ -0,0 +1,27 @@ +{ + lib, +}: + +{ + options = { + cve = lib.mkOption { + type = lib.types.nullOr (lib.types.listOf lib.types.str); + default = null; + description = '' + List of CVE identifiers to match. The whitelist rule is valid as long as the detected CVEs are a subset of the CVEs listed here. + If additional CVEs are detected, this whitelist rule is not effective anymore. If null, all CVEs are matched.''; + }; + until = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = '' + Date in the form "YYYY-MM-DD" which confines this rule's lifetime. Null means forever. + On the specified date and later, this whitelist rule is not effective anymore.''; + }; + issueUrl = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "URL or list of URLs that point to any issue tracker. Informational only."; + }; + }; +} diff --git a/src/lego_monitoring/__init__.py b/src/lego_monitoring/__init__.py index 981951e..12191b0 100644 --- a/src/lego_monitoring/__init__.py +++ b/src/lego_monitoring/__init__.py @@ -56,6 +56,7 @@ async def async_main(): ], "stop": [], # this is checked later "temp": [interval_checker(checks.temp_check, datetime.timedelta(minutes=5))], + "vulnix": [interval_checker(checks.vulnix_check, datetime.timedelta(days=3))], } checkers = [] diff --git a/src/lego_monitoring/alerting/enum.py b/src/lego_monitoring/alerting/enum.py index 2e121c7..de6ca5e 100644 --- a/src/lego_monitoring/alerting/enum.py +++ b/src/lego_monitoring/alerting/enum.py @@ -5,10 +5,10 @@ class AlertType(StrEnum): BOOT = "BOOT" TEMP = "TEMP" TEST = "TEST" - # ERROR = "ERROR" + VULN = "VULN" + ERROR = "ERROR" # RAM = "RAM" # CPU = "CPU" - # VULN = "VULN" # LOGIN = "LOGIN" # SMART = "SMART" # TODO # RAID = "RAID" diff --git a/src/lego_monitoring/checks/__init__.py b/src/lego_monitoring/checks/__init__.py index af7a958..df26ddf 100644 --- a/src/lego_monitoring/checks/__init__.py +++ b/src/lego_monitoring/checks/__init__.py @@ -1 +1,2 @@ from .temp import temp_check +from .vulnix import vulnix_check diff --git a/src/lego_monitoring/checks/vulnix/__init__.py b/src/lego_monitoring/checks/vulnix/__init__.py new file mode 100644 index 0000000..c9f7a10 --- /dev/null +++ b/src/lego_monitoring/checks/vulnix/__init__.py @@ -0,0 +1,48 @@ +from lego_monitoring.alerting import alerts +from lego_monitoring.alerting.enum import AlertType, Severity + +from .vulnix import get_vulnix_output + +IS_TESTING = False + + +def vulnix_check() -> list[alerts.Alert]: + alert_list = [] + try: + vulnix_output = get_vulnix_output(IS_TESTING) + except Exception as e: + alerts.send_alert( + alerts.Alert( + alert_type=AlertType.ERROR, + message=f"Exception {type(e).__name__} while calling vulnix: {e}", + severity=Severity.CRITICAL, + ) + ) + return [] + for finding in vulnix_output: + if not IS_TESTING: + non_whitelisted_cves = [k for k in finding.description if k not in finding.whitelisted] + else: + non_whitelisted_cves = finding.description.keys() + if len(non_whitelisted_cves) == 0: + continue + message = f"New findings in derivation {finding.derivation}:" + for cve in non_whitelisted_cves: + if cve in finding.cvssv3_basescore: + score_str = f"(CVSSv3 = {finding.cvssv3_basescore[cve]})" + else: + score_str = "(not scored by CVSSv3)" + message += f'\n* {cve} - {finding.description[cve]} {score_str}' + + alert = alerts.Alert( + alert_type=AlertType.VULN, + message=message, + severity=Severity.WARNING, + ) + alert_list.append(alert) + + if IS_TESTING: + alert_list[0].message += "\n(just testing)" + return [alert_list[0]] + else: + return alert_list diff --git a/src/lego_monitoring/checks/vulnix/vulnix.py b/src/lego_monitoring/checks/vulnix/vulnix.py new file mode 100644 index 0000000..ff31662 --- /dev/null +++ b/src/lego_monitoring/checks/vulnix/vulnix.py @@ -0,0 +1,41 @@ +import json +import logging +import subprocess +from dataclasses import dataclass + +from lego_monitoring.core import cvars +from lego_monitoring.core.const import VULNIX_PATH + + +@dataclass +class VulnixPackageFindings: + pname: str + version: str + derivation: str + whitelisted: list[str] + cvssv3_basescore: dict[str, float] + description: dict[str, float] + + +def get_vulnix_output(is_testing=False) -> list[VulnixPackageFindings]: + whitelist_path = cvars.config.get().checks.vulnix.whitelist_path + cmd = [VULNIX_PATH, "--system", "-w", whitelist_path, "--json"] + if is_testing: + cmd.append("-s") + vulnix_run = subprocess.run(cmd, capture_output=True, check=False) + if vulnix_run.returncode not in range(0, 3): + logging.error(f"Vulnix returned error code {vulnix_run.returncode}, stderr: {vulnix_run.stderr}") + raise Exception(f"vulnix return error code {vulnix_run.returncode}, check logs") + vulnix_findings_json = json.loads(vulnix_run.stdout) + vulnix_findings = [ + VulnixPackageFindings( + pname=f["pname"], + version=f["version"], + derivation=f["derivation"], + whitelisted=f["whitelisted"], + cvssv3_basescore=f["cvssv3_basescore"], + description=f["description"], + ) + for f in vulnix_findings_json + ] + return vulnix_findings diff --git a/src/lego_monitoring/config/__init__.py b/src/lego_monitoring/config/__init__.py index bebca3d..31b58c5 100644 --- a/src/lego_monitoring/config/__init__.py +++ b/src/lego_monitoring/config/__init__.py @@ -4,11 +4,13 @@ from dataclasses import dataclass from alt_utils import NestedDeserializableDataclass from .checks.temp import TempCheckConfig +from .checks.vulnix import VulnixCheckConfig @dataclass class ChecksConfig(NestedDeserializableDataclass): temp: TempCheckConfig + vulnix: VulnixCheckConfig @dataclass diff --git a/src/lego_monitoring/config/checks/vulnix.py b/src/lego_monitoring/config/checks/vulnix.py new file mode 100644 index 0000000..e68fc03 --- /dev/null +++ b/src/lego_monitoring/config/checks/vulnix.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass + +from alt_utils import NestedDeserializableDataclass + + +@dataclass +class VulnixCheckConfig(NestedDeserializableDataclass): + whitelist_path: str diff --git a/src/lego_monitoring/core/const.py b/src/lego_monitoring/core/const.py new file mode 100644 index 0000000..dde2ddf --- /dev/null +++ b/src/lego_monitoring/core/const.py @@ -0,0 +1 @@ +VULNIX_PATH: str = ... # path to vulnix executable