From de0ce7d3b08a3244950e3c42582ea96dcc1e1dbf Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 11 Aug 2024 13:20:55 +0300 Subject: [PATCH] vuln alerts from arch-audit --- .vscode/settings.json | 11 ++++++++- alerting/alerts.py | 23 +++++++++++++----- misc/checks.py | 29 ++++++++++++++++++++++- misc/vuln.py | 55 +++++++++++++++++++++++++++++++++++++++++++ service.py | 12 +++++++--- tests/test_vuln.py | 34 ++++++++++++++++++++++++++ 6 files changed, 153 insertions(+), 11 deletions(-) create mode 100644 misc/vuln.py create mode 100644 tests/test_vuln.py diff --git a/.vscode/settings.json b/.vscode/settings.json index af0962a..f5b3bc0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,5 +12,14 @@ ], "black-formatter.args": [ "--line-length=120" - ] + ], + "python.testing.unittestArgs": [ + "-v", + "-s", + "./tests", + "-p", + "test_*.py" + ], + "python.testing.pytestEnabled": false, + "python.testing.unittestEnabled": true } diff --git a/alerting/alerts.py b/alerting/alerts.py index bc78a28..f387dc4 100644 --- a/alerting/alerts.py +++ b/alerting/alerts.py @@ -11,9 +11,11 @@ from alerting.common import CREDS_FILE, ROOM_ID class AlertType(StrEnum): TEST = "TEST" + ERROR = "ERROR" RAM = "RAM" CPU = "CPU" TEMP = "TEMP" + VULN = "VULN" LOGIN = "LOGIN" # TODO SMART = "SMART" # TODO RAID = "RAID" # TODO @@ -30,6 +32,7 @@ class Alert: alert_type: AlertType message: str severity: Severity + html_message: Optional[str] = None async def get_client() -> nio.AsyncClient: @@ -56,7 +59,11 @@ def format_message(alert: Alert) -> str: case Severity.CRITICAL: severity_emoji = "🆘" message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}" - return message + if alert.html_message: + html_message = f"{severity_emoji} {alert.alert_type} Alert
{alert.html_message}" + return message, html_message + else: + return message, None async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None: @@ -65,14 +72,18 @@ async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> client = await get_client() else: temp_client = False - message = format_message(alert) + message, html_message = format_message(alert) + content = { + "msgtype": "m.text", + "body": message, + } + if html_message: + content["format"] = "org.matrix.custom.html" + content["formatted_body"] = html_message await client.room_send( room_id=ROOM_ID, message_type="m.room.message", - content={ - "msgtype": "m.text", - "body": message, - }, + content=content, ) if temp_client: await client.close() diff --git a/misc/checks.py b/misc/checks.py index 5e7cc1f..6efefd2 100644 --- a/misc/checks.py +++ b/misc/checks.py @@ -1,5 +1,5 @@ from alerting import alerts -from misc import sensors +from misc import sensors, vuln IS_TESTING = False @@ -65,3 +65,30 @@ def ram_check() -> list[alerts.Alert]: else: return [] return [alert] + + +async def vuln_check() -> list[alerts.Alert]: + vulns = await vuln.get_vulns() + alert_list = [] + for v in vulns: + if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL): + match v.severity: + case vuln.Severity.LOW: + severity = alerts.Severity.INFO + case vuln.Severity.MEDIUM: + severity = alerts.Severity.WARNING + case vuln.Severity.HIGH | vuln.Severity.CRITICAL: + severity = alerts.Severity.CRITICAL + message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" + html_message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}" + if v.fixed: + message.append(f" -- update to {v.fixed} ASAP") + html_message.append(f" -- update to {v.fixed} ASAP") + alert = alerts.Alert( + alert_type=alerts.AlertType.VULN, + message=message, + html_message=html_message, + severity=severity, + ) + alert_list.append(alert) + return alert_list diff --git a/misc/vuln.py b/misc/vuln.py new file mode 100644 index 0000000..b294755 --- /dev/null +++ b/misc/vuln.py @@ -0,0 +1,55 @@ +import json +import subprocess +from dataclasses import dataclass +from enum import StrEnum +from typing import Optional + +from alerting import alerts + + +class Severity(StrEnum): + LOW = "Low" + MEDIUM = "Medium" + HIGH = "High" + CRITICAL = "Critical" + + +@dataclass +class Vulnerability: + id: str + link: str + vuln_type: str + packages: list[str] + severity: Severity + fixed: Optional[str] + + +def _parse_arch_audit_output(output: str) -> list[Vulnerability]: + arch_audit_json = json.loads(output) + vulnerabilities = [] + for v in arch_audit_json: + vulnerability = Vulnerability( + id=v["name"], + link=f"https://security.archlinux.org/{v['name']}", + vuln_type=v["type"], + packages=v["packages"], + severity=v["severity"], + fixed=v["fixed"], + ) + vulnerabilities.append(vulnerability) + return vulnerabilities + + +async def get_vulns() -> list[Vulnerability]: + try: + arch_audit_output = subprocess.check_output(["arch-audit", "--json"]) + except FileNotFoundError: + await alerts.send_alert( + alerts.Alert( + alert_type=alerts.AlertType.ERROR, + message="arch-audit not installed!", + severity=alerts.Severity.CRITICAL, + ) + ) + return [] + return _parse_arch_audit_output(arch_audit_output) diff --git a/service.py b/service.py index 4f13ba5..59d8e60 100755 --- a/service.py +++ b/service.py @@ -37,12 +37,18 @@ async def checker(check: Callable | Coroutine, interval_secs: int, client: nio.A async def main(): + MINUTE = 60 + HOUR = 60 * MINUTE + DAY = 24 * HOUR + WEEK = 7 * DAY + signal.signal(signal.SIGTERM, stop_gracefully) client = await alerts.get_client() checkers = ( - checker(checks.temp_check, 5 * 60, client), - checker(checks.cpu_check, 5 * 60, client), - checker(checks.ram_check, 1 * 60, client), + checker(checks.temp_check, 5 * MINUTE, client), + checker(checks.cpu_check, 5 * MINUTE, client), + checker(checks.ram_check, 1 * MINUTE, client), + checker(checks.vuln_check, 1 * DAY, client), ) async with asyncio.TaskGroup() as tg: checker_tasks: set[asyncio.Task] = set() diff --git a/tests/test_vuln.py b/tests/test_vuln.py new file mode 100644 index 0000000..684a322 --- /dev/null +++ b/tests/test_vuln.py @@ -0,0 +1,34 @@ +import unittest + +from misc import vuln + + +class TestVuln(unittest.TestCase): + def test_parse_arch_audit_output(self): + self.assertEqual( + vuln._parse_arch_audit_output( + """[ + {"name":"AVG-2765", + "packages":["openssl"], + "status":"Vulnerable", + "type":"arbitrary command execution", + "severity":"Medium", + "fixed":null, + "issues":["CVE-2022-2068"]} + ]""" + ), + [ + vuln.Vulnerability( + id="AVG-2765", + link="https://security.archlinux.org/AVG-2765", + vuln_type="arbitrary command execution", + packages=["openssl"], + severity=vuln.Severity.MEDIUM, + fixed=None, + ) + ], + ) + + +if __name__ == "__main__": + unittest.main()