vulnix integration

This commit is contained in:
Alex Tau 2025-05-09 15:27:22 +03:00
parent 758438382d
commit 436855d8c1
11 changed files with 172 additions and 2 deletions

View file

@ -56,6 +56,7 @@ async def async_main():
],
"stop": [], # this is checked later
"temp": [interval_checker(checks.temp_check, datetime.timedelta(minutes=5))],
"vulnix": [interval_checker(checks.vulnix_check, datetime.timedelta(days=3))],
}
checkers = []

View file

@ -5,10 +5,10 @@ class AlertType(StrEnum):
BOOT = "BOOT"
TEMP = "TEMP"
TEST = "TEST"
# ERROR = "ERROR"
VULN = "VULN"
ERROR = "ERROR"
# RAM = "RAM"
# CPU = "CPU"
# VULN = "VULN"
# LOGIN = "LOGIN"
# SMART = "SMART" # TODO
# RAID = "RAID"

View file

@ -1 +1,2 @@
from .temp import temp_check
from .vulnix import vulnix_check

View file

@ -0,0 +1,48 @@
from lego_monitoring.alerting import alerts
from lego_monitoring.alerting.enum import AlertType, Severity
from .vulnix import get_vulnix_output
IS_TESTING = False
def vulnix_check() -> list[alerts.Alert]:
alert_list = []
try:
vulnix_output = get_vulnix_output(IS_TESTING)
except Exception as e:
alerts.send_alert(
alerts.Alert(
alert_type=AlertType.ERROR,
message=f"Exception {type(e).__name__} while calling vulnix: {e}",
severity=Severity.CRITICAL,
)
)
return []
for finding in vulnix_output:
if not IS_TESTING:
non_whitelisted_cves = [k for k in finding.description if k not in finding.whitelisted]
else:
non_whitelisted_cves = finding.description.keys()
if len(non_whitelisted_cves) == 0:
continue
message = f"New findings in derivation <code>{finding.derivation}</code>:"
for cve in non_whitelisted_cves:
if cve in finding.cvssv3_basescore:
score_str = f"(CVSSv3 = {finding.cvssv3_basescore[cve]})"
else:
score_str = "(not scored by CVSSv3)"
message += f'\n* <a href="https://nvd.nist.gov/vuln/detail/{cve}">{cve}</a> - {finding.description[cve]} {score_str}'
alert = alerts.Alert(
alert_type=AlertType.VULN,
message=message,
severity=Severity.WARNING,
)
alert_list.append(alert)
if IS_TESTING:
alert_list[0].message += "\n(just testing)"
return [alert_list[0]]
else:
return alert_list

View file

@ -0,0 +1,41 @@
import json
import logging
import subprocess
from dataclasses import dataclass
from lego_monitoring.core import cvars
from lego_monitoring.core.const import VULNIX_PATH
@dataclass
class VulnixPackageFindings:
pname: str
version: str
derivation: str
whitelisted: list[str]
cvssv3_basescore: dict[str, float]
description: dict[str, float]
def get_vulnix_output(is_testing=False) -> list[VulnixPackageFindings]:
whitelist_path = cvars.config.get().checks.vulnix.whitelist_path
cmd = [VULNIX_PATH, "--system", "-w", whitelist_path, "--json"]
if is_testing:
cmd.append("-s")
vulnix_run = subprocess.run(cmd, capture_output=True, check=False)
if vulnix_run.returncode not in range(0, 3):
logging.error(f"Vulnix returned error code {vulnix_run.returncode}, stderr: {vulnix_run.stderr}")
raise Exception(f"vulnix return error code {vulnix_run.returncode}, check logs")
vulnix_findings_json = json.loads(vulnix_run.stdout)
vulnix_findings = [
VulnixPackageFindings(
pname=f["pname"],
version=f["version"],
derivation=f["derivation"],
whitelisted=f["whitelisted"],
cvssv3_basescore=f["cvssv3_basescore"],
description=f["description"],
)
for f in vulnix_findings_json
]
return vulnix_findings

View file

@ -4,11 +4,13 @@ from dataclasses import dataclass
from alt_utils import NestedDeserializableDataclass
from .checks.temp import TempCheckConfig
from .checks.vulnix import VulnixCheckConfig
@dataclass
class ChecksConfig(NestedDeserializableDataclass):
temp: TempCheckConfig
vulnix: VulnixCheckConfig
@dataclass

View file

@ -0,0 +1,8 @@
from dataclasses import dataclass
from alt_utils import NestedDeserializableDataclass
@dataclass
class VulnixCheckConfig(NestedDeserializableDataclass):
whitelist_path: str

View file

@ -0,0 +1 @@
VULNIX_PATH: str = ... # path to vulnix executable