Merge branch 'arch-audit' into 'main'

vuln alerts from arch-audit

See merge request lego/lego-monitoring!2
This commit is contained in:
Alex Tau 2024-08-13 12:56:32 +00:00
commit aac320a0c8
7 changed files with 154 additions and 12 deletions

11
.vscode/settings.json vendored
View file

@ -12,5 +12,14 @@
],
"black-formatter.args": [
"--line-length=120"
]
],
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"test_*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
}

View file

@ -2,7 +2,7 @@
## Prerequisites
* `pacman -S libolm`
* `pacman -S libolm arch-audit`
* `pip -r requirements.txt`
## Configuring

View file

@ -11,9 +11,11 @@ from alerting.common import CREDS_FILE, ROOM_ID
class AlertType(StrEnum):
TEST = "TEST"
ERROR = "ERROR"
RAM = "RAM"
CPU = "CPU"
TEMP = "TEMP"
VULN = "VULN"
LOGIN = "LOGIN" # TODO
SMART = "SMART" # TODO
RAID = "RAID" # TODO
@ -30,6 +32,7 @@ class Alert:
alert_type: AlertType
message: str
severity: Severity
html_message: Optional[str] = None
async def get_client() -> nio.AsyncClient:
@ -56,7 +59,11 @@ def format_message(alert: Alert) -> str:
case Severity.CRITICAL:
severity_emoji = "🆘"
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
return message
if alert.html_message:
html_message = f"{severity_emoji} {alert.alert_type} Alert<br>{alert.html_message}"
return message, html_message
else:
return message, None
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
@ -65,14 +72,18 @@ async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) ->
client = await get_client()
else:
temp_client = False
message = format_message(alert)
message, html_message = format_message(alert)
content = {
"msgtype": "m.text",
"body": message,
}
if html_message:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = html_message
await client.room_send(
room_id=ROOM_ID,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
},
content=content,
)
if temp_client:
await client.close()

View file

@ -1,5 +1,5 @@
from alerting import alerts
from misc import sensors
from misc import sensors, vuln
IS_TESTING = False
@ -65,3 +65,30 @@ def ram_check() -> list[alerts.Alert]:
else:
return []
return [alert]
async def vuln_check() -> list[alerts.Alert]:
vulns = await vuln.get_vulns()
alert_list = []
for v in vulns:
if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL):
match v.severity:
case vuln.Severity.LOW:
severity = alerts.Severity.INFO
case vuln.Severity.MEDIUM:
severity = alerts.Severity.WARNING
case vuln.Severity.HIGH | vuln.Severity.CRITICAL:
severity = alerts.Severity.CRITICAL
message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
html_message = f"<a href='{v.link}'>{v.id}</a>: {v.vuln_type} in {','.join(v.packages)}"
if v.fixed:
message.append(f" -- update to {v.fixed} ASAP")
html_message.append(f" -- update to {v.fixed} ASAP")
alert = alerts.Alert(
alert_type=alerts.AlertType.VULN,
message=message,
html_message=html_message,
severity=severity,
)
alert_list.append(alert)
return alert_list

55
misc/vuln.py Normal file
View file

@ -0,0 +1,55 @@
import json
import subprocess
from dataclasses import dataclass
from enum import StrEnum
from typing import Optional
from alerting import alerts
class Severity(StrEnum):
LOW = "Low"
MEDIUM = "Medium"
HIGH = "High"
CRITICAL = "Critical"
@dataclass
class Vulnerability:
id: str
link: str
vuln_type: str
packages: list[str]
severity: Severity
fixed: Optional[str]
def _parse_arch_audit_output(output: str) -> list[Vulnerability]:
arch_audit_json = json.loads(output)
vulnerabilities = []
for v in arch_audit_json:
vulnerability = Vulnerability(
id=v["name"],
link=f"https://security.archlinux.org/{v['name']}",
vuln_type=v["type"],
packages=v["packages"],
severity=v["severity"],
fixed=v["fixed"],
)
vulnerabilities.append(vulnerability)
return vulnerabilities
async def get_vulns() -> list[Vulnerability]:
try:
arch_audit_output = subprocess.check_output(["arch-audit", "--json"])
except FileNotFoundError:
await alerts.send_alert(
alerts.Alert(
alert_type=alerts.AlertType.ERROR,
message="arch-audit not installed!",
severity=alerts.Severity.CRITICAL,
)
)
return []
return _parse_arch_audit_output(arch_audit_output)

View file

@ -37,12 +37,18 @@ async def checker(check: Callable | Coroutine, interval_secs: int, client: nio.A
async def main():
MINUTE = 60
HOUR = 60 * MINUTE
DAY = 24 * HOUR
WEEK = 7 * DAY
signal.signal(signal.SIGTERM, stop_gracefully)
client = await alerts.get_client()
checkers = (
checker(checks.temp_check, 5 * 60, client),
checker(checks.cpu_check, 5 * 60, client),
checker(checks.ram_check, 1 * 60, client),
checker(checks.temp_check, 5 * MINUTE, client),
checker(checks.cpu_check, 5 * MINUTE, client),
checker(checks.ram_check, 1 * MINUTE, client),
checker(checks.vuln_check, 1 * DAY, client),
)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()

34
tests/test_vuln.py Normal file
View file

@ -0,0 +1,34 @@
import unittest
from misc import vuln
class TestVuln(unittest.TestCase):
def test_parse_arch_audit_output(self):
self.assertEqual(
vuln._parse_arch_audit_output(
"""[
{"name":"AVG-2765",
"packages":["openssl"],
"status":"Vulnerable",
"type":"arbitrary command execution",
"severity":"Medium",
"fixed":null,
"issues":["CVE-2022-2068"]}
]"""
),
[
vuln.Vulnerability(
id="AVG-2765",
link="https://security.archlinux.org/AVG-2765",
vuln_type="arbitrary command execution",
packages=["openssl"],
severity=vuln.Severity.MEDIUM,
fixed=None,
)
],
)
if __name__ == "__main__":
unittest.main()