mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-10 04:41:10 +00:00
vuln alerts from arch-audit
This commit is contained in:
parent
56ebed516e
commit
de0ce7d3b0
6 changed files with 153 additions and 11 deletions
11
.vscode/settings.json
vendored
11
.vscode/settings.json
vendored
|
|
@ -12,5 +12,14 @@
|
||||||
],
|
],
|
||||||
"black-formatter.args": [
|
"black-formatter.args": [
|
||||||
"--line-length=120"
|
"--line-length=120"
|
||||||
]
|
],
|
||||||
|
"python.testing.unittestArgs": [
|
||||||
|
"-v",
|
||||||
|
"-s",
|
||||||
|
"./tests",
|
||||||
|
"-p",
|
||||||
|
"test_*.py"
|
||||||
|
],
|
||||||
|
"python.testing.pytestEnabled": false,
|
||||||
|
"python.testing.unittestEnabled": true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,11 @@ from alerting.common import CREDS_FILE, ROOM_ID
|
||||||
|
|
||||||
class AlertType(StrEnum):
|
class AlertType(StrEnum):
|
||||||
TEST = "TEST"
|
TEST = "TEST"
|
||||||
|
ERROR = "ERROR"
|
||||||
RAM = "RAM"
|
RAM = "RAM"
|
||||||
CPU = "CPU"
|
CPU = "CPU"
|
||||||
TEMP = "TEMP"
|
TEMP = "TEMP"
|
||||||
|
VULN = "VULN"
|
||||||
LOGIN = "LOGIN" # TODO
|
LOGIN = "LOGIN" # TODO
|
||||||
SMART = "SMART" # TODO
|
SMART = "SMART" # TODO
|
||||||
RAID = "RAID" # TODO
|
RAID = "RAID" # TODO
|
||||||
|
|
@ -30,6 +32,7 @@ class Alert:
|
||||||
alert_type: AlertType
|
alert_type: AlertType
|
||||||
message: str
|
message: str
|
||||||
severity: Severity
|
severity: Severity
|
||||||
|
html_message: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
async def get_client() -> nio.AsyncClient:
|
async def get_client() -> nio.AsyncClient:
|
||||||
|
|
@ -56,7 +59,11 @@ def format_message(alert: Alert) -> str:
|
||||||
case Severity.CRITICAL:
|
case Severity.CRITICAL:
|
||||||
severity_emoji = "🆘"
|
severity_emoji = "🆘"
|
||||||
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
|
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
|
||||||
return message
|
if alert.html_message:
|
||||||
|
html_message = f"{severity_emoji} {alert.alert_type} Alert<br>{alert.html_message}"
|
||||||
|
return message, html_message
|
||||||
|
else:
|
||||||
|
return message, None
|
||||||
|
|
||||||
|
|
||||||
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
|
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
|
||||||
|
|
@ -65,14 +72,18 @@ async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) ->
|
||||||
client = await get_client()
|
client = await get_client()
|
||||||
else:
|
else:
|
||||||
temp_client = False
|
temp_client = False
|
||||||
message = format_message(alert)
|
message, html_message = format_message(alert)
|
||||||
|
content = {
|
||||||
|
"msgtype": "m.text",
|
||||||
|
"body": message,
|
||||||
|
}
|
||||||
|
if html_message:
|
||||||
|
content["format"] = "org.matrix.custom.html"
|
||||||
|
content["formatted_body"] = html_message
|
||||||
await client.room_send(
|
await client.room_send(
|
||||||
room_id=ROOM_ID,
|
room_id=ROOM_ID,
|
||||||
message_type="m.room.message",
|
message_type="m.room.message",
|
||||||
content={
|
content=content,
|
||||||
"msgtype": "m.text",
|
|
||||||
"body": message,
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
if temp_client:
|
if temp_client:
|
||||||
await client.close()
|
await client.close()
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
from alerting import alerts
|
from alerting import alerts
|
||||||
from misc import sensors
|
from misc import sensors, vuln
|
||||||
|
|
||||||
IS_TESTING = False
|
IS_TESTING = False
|
||||||
|
|
||||||
|
|
@ -65,3 +65,30 @@ def ram_check() -> list[alerts.Alert]:
|
||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
return [alert]
|
return [alert]
|
||||||
|
|
||||||
|
|
||||||
|
async def vuln_check() -> list[alerts.Alert]:
|
||||||
|
vulns = await vuln.get_vulns()
|
||||||
|
alert_list = []
|
||||||
|
for v in vulns:
|
||||||
|
if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL):
|
||||||
|
match v.severity:
|
||||||
|
case vuln.Severity.LOW:
|
||||||
|
severity = alerts.Severity.INFO
|
||||||
|
case vuln.Severity.MEDIUM:
|
||||||
|
severity = alerts.Severity.WARNING
|
||||||
|
case vuln.Severity.HIGH | vuln.Severity.CRITICAL:
|
||||||
|
severity = alerts.Severity.CRITICAL
|
||||||
|
message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
|
||||||
|
html_message = f"<a href='{v.link}'>{v.id}</a>: {v.vuln_type} in {','.join(v.packages)}"
|
||||||
|
if v.fixed:
|
||||||
|
message.append(f" -- update to {v.fixed} ASAP")
|
||||||
|
html_message.append(f" -- update to {v.fixed} ASAP")
|
||||||
|
alert = alerts.Alert(
|
||||||
|
alert_type=alerts.AlertType.VULN,
|
||||||
|
message=message,
|
||||||
|
html_message=html_message,
|
||||||
|
severity=severity,
|
||||||
|
)
|
||||||
|
alert_list.append(alert)
|
||||||
|
return alert_list
|
||||||
|
|
|
||||||
55
misc/vuln.py
Normal file
55
misc/vuln.py
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import StrEnum
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from alerting import alerts
|
||||||
|
|
||||||
|
|
||||||
|
class Severity(StrEnum):
|
||||||
|
LOW = "Low"
|
||||||
|
MEDIUM = "Medium"
|
||||||
|
HIGH = "High"
|
||||||
|
CRITICAL = "Critical"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Vulnerability:
|
||||||
|
id: str
|
||||||
|
link: str
|
||||||
|
vuln_type: str
|
||||||
|
packages: list[str]
|
||||||
|
severity: Severity
|
||||||
|
fixed: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_arch_audit_output(output: str) -> list[Vulnerability]:
|
||||||
|
arch_audit_json = json.loads(output)
|
||||||
|
vulnerabilities = []
|
||||||
|
for v in arch_audit_json:
|
||||||
|
vulnerability = Vulnerability(
|
||||||
|
id=v["name"],
|
||||||
|
link=f"https://security.archlinux.org/{v['name']}",
|
||||||
|
vuln_type=v["type"],
|
||||||
|
packages=v["packages"],
|
||||||
|
severity=v["severity"],
|
||||||
|
fixed=v["fixed"],
|
||||||
|
)
|
||||||
|
vulnerabilities.append(vulnerability)
|
||||||
|
return vulnerabilities
|
||||||
|
|
||||||
|
|
||||||
|
async def get_vulns() -> list[Vulnerability]:
|
||||||
|
try:
|
||||||
|
arch_audit_output = subprocess.check_output(["arch-audit", "--json"])
|
||||||
|
except FileNotFoundError:
|
||||||
|
await alerts.send_alert(
|
||||||
|
alerts.Alert(
|
||||||
|
alert_type=alerts.AlertType.ERROR,
|
||||||
|
message="arch-audit not installed!",
|
||||||
|
severity=alerts.Severity.CRITICAL,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return []
|
||||||
|
return _parse_arch_audit_output(arch_audit_output)
|
||||||
12
service.py
12
service.py
|
|
@ -37,12 +37,18 @@ async def checker(check: Callable | Coroutine, interval_secs: int, client: nio.A
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
MINUTE = 60
|
||||||
|
HOUR = 60 * MINUTE
|
||||||
|
DAY = 24 * HOUR
|
||||||
|
WEEK = 7 * DAY
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, stop_gracefully)
|
signal.signal(signal.SIGTERM, stop_gracefully)
|
||||||
client = await alerts.get_client()
|
client = await alerts.get_client()
|
||||||
checkers = (
|
checkers = (
|
||||||
checker(checks.temp_check, 5 * 60, client),
|
checker(checks.temp_check, 5 * MINUTE, client),
|
||||||
checker(checks.cpu_check, 5 * 60, client),
|
checker(checks.cpu_check, 5 * MINUTE, client),
|
||||||
checker(checks.ram_check, 1 * 60, client),
|
checker(checks.ram_check, 1 * MINUTE, client),
|
||||||
|
checker(checks.vuln_check, 1 * DAY, client),
|
||||||
)
|
)
|
||||||
async with asyncio.TaskGroup() as tg:
|
async with asyncio.TaskGroup() as tg:
|
||||||
checker_tasks: set[asyncio.Task] = set()
|
checker_tasks: set[asyncio.Task] = set()
|
||||||
|
|
|
||||||
34
tests/test_vuln.py
Normal file
34
tests/test_vuln.py
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from misc import vuln
|
||||||
|
|
||||||
|
|
||||||
|
class TestVuln(unittest.TestCase):
|
||||||
|
def test_parse_arch_audit_output(self):
|
||||||
|
self.assertEqual(
|
||||||
|
vuln._parse_arch_audit_output(
|
||||||
|
"""[
|
||||||
|
{"name":"AVG-2765",
|
||||||
|
"packages":["openssl"],
|
||||||
|
"status":"Vulnerable",
|
||||||
|
"type":"arbitrary command execution",
|
||||||
|
"severity":"Medium",
|
||||||
|
"fixed":null,
|
||||||
|
"issues":["CVE-2022-2068"]}
|
||||||
|
]"""
|
||||||
|
),
|
||||||
|
[
|
||||||
|
vuln.Vulnerability(
|
||||||
|
id="AVG-2765",
|
||||||
|
link="https://security.archlinux.org/AVG-2765",
|
||||||
|
vuln_type="arbitrary command execution",
|
||||||
|
packages=["openssl"],
|
||||||
|
severity=vuln.Severity.MEDIUM,
|
||||||
|
fixed=None,
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue