diff --git a/.vscode/settings.json b/.vscode/settings.json
index af0962a..f5b3bc0 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -12,5 +12,14 @@
],
"black-formatter.args": [
"--line-length=120"
- ]
+ ],
+ "python.testing.unittestArgs": [
+ "-v",
+ "-s",
+ "./tests",
+ "-p",
+ "test_*.py"
+ ],
+ "python.testing.pytestEnabled": false,
+ "python.testing.unittestEnabled": true
}
diff --git a/README.md b/README.md
index 6bb4e40..42ff93b 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
## Prerequisites
-* `pacman -S libolm`
+* `pacman -S libolm arch-audit`
* `pip -r requirements.txt`
## Configuring
diff --git a/alerting/alerts.py b/alerting/alerts.py
index bc78a28..f387dc4 100644
--- a/alerting/alerts.py
+++ b/alerting/alerts.py
@@ -11,9 +11,11 @@ from alerting.common import CREDS_FILE, ROOM_ID
class AlertType(StrEnum):
TEST = "TEST"
+ ERROR = "ERROR"
RAM = "RAM"
CPU = "CPU"
TEMP = "TEMP"
+ VULN = "VULN"
LOGIN = "LOGIN" # TODO
SMART = "SMART" # TODO
RAID = "RAID" # TODO
@@ -30,6 +32,7 @@ class Alert:
alert_type: AlertType
message: str
severity: Severity
+ html_message: Optional[str] = None
async def get_client() -> nio.AsyncClient:
@@ -56,7 +59,11 @@ def format_message(alert: Alert) -> str:
case Severity.CRITICAL:
severity_emoji = "🆘"
message = f"{severity_emoji} {alert.alert_type} Alert\n{alert.message}"
- return message
+ if alert.html_message:
+ html_message = f"{severity_emoji} {alert.alert_type} Alert
{alert.html_message}"
+ return message, html_message
+ else:
+ return message, None
async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) -> None:
@@ -65,14 +72,18 @@ async def send_alert(alert: Alert, client: Optional[nio.AsyncClient] = None) ->
client = await get_client()
else:
temp_client = False
- message = format_message(alert)
+ message, html_message = format_message(alert)
+ content = {
+ "msgtype": "m.text",
+ "body": message,
+ }
+ if html_message:
+ content["format"] = "org.matrix.custom.html"
+ content["formatted_body"] = html_message
await client.room_send(
room_id=ROOM_ID,
message_type="m.room.message",
- content={
- "msgtype": "m.text",
- "body": message,
- },
+ content=content,
)
if temp_client:
await client.close()
diff --git a/misc/checks.py b/misc/checks.py
index 5e7cc1f..6efefd2 100644
--- a/misc/checks.py
+++ b/misc/checks.py
@@ -1,5 +1,5 @@
from alerting import alerts
-from misc import sensors
+from misc import sensors, vuln
IS_TESTING = False
@@ -65,3 +65,30 @@ def ram_check() -> list[alerts.Alert]:
else:
return []
return [alert]
+
+
+async def vuln_check() -> list[alerts.Alert]:
+ vulns = await vuln.get_vulns()
+ alert_list = []
+ for v in vulns:
+ if IS_TESTING or v.fixed or v.severity in (vuln.Severity.HIGH, vuln.Severity.CRITICAL):
+ match v.severity:
+ case vuln.Severity.LOW:
+ severity = alerts.Severity.INFO
+ case vuln.Severity.MEDIUM:
+ severity = alerts.Severity.WARNING
+ case vuln.Severity.HIGH | vuln.Severity.CRITICAL:
+ severity = alerts.Severity.CRITICAL
+ message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
+ html_message = f"{v.id}: {v.vuln_type} in {','.join(v.packages)}"
+ if v.fixed:
+ message.append(f" -- update to {v.fixed} ASAP")
+ html_message.append(f" -- update to {v.fixed} ASAP")
+ alert = alerts.Alert(
+ alert_type=alerts.AlertType.VULN,
+ message=message,
+ html_message=html_message,
+ severity=severity,
+ )
+ alert_list.append(alert)
+ return alert_list
diff --git a/misc/vuln.py b/misc/vuln.py
new file mode 100644
index 0000000..b294755
--- /dev/null
+++ b/misc/vuln.py
@@ -0,0 +1,55 @@
+import json
+import subprocess
+from dataclasses import dataclass
+from enum import StrEnum
+from typing import Optional
+
+from alerting import alerts
+
+
+class Severity(StrEnum):
+ LOW = "Low"
+ MEDIUM = "Medium"
+ HIGH = "High"
+ CRITICAL = "Critical"
+
+
+@dataclass
+class Vulnerability:
+ id: str
+ link: str
+ vuln_type: str
+ packages: list[str]
+ severity: Severity
+ fixed: Optional[str]
+
+
+def _parse_arch_audit_output(output: str) -> list[Vulnerability]:
+ arch_audit_json = json.loads(output)
+ vulnerabilities = []
+ for v in arch_audit_json:
+ vulnerability = Vulnerability(
+ id=v["name"],
+ link=f"https://security.archlinux.org/{v['name']}",
+ vuln_type=v["type"],
+ packages=v["packages"],
+ severity=v["severity"],
+ fixed=v["fixed"],
+ )
+ vulnerabilities.append(vulnerability)
+ return vulnerabilities
+
+
+async def get_vulns() -> list[Vulnerability]:
+ try:
+ arch_audit_output = subprocess.check_output(["arch-audit", "--json"])
+ except FileNotFoundError:
+ await alerts.send_alert(
+ alerts.Alert(
+ alert_type=alerts.AlertType.ERROR,
+ message="arch-audit not installed!",
+ severity=alerts.Severity.CRITICAL,
+ )
+ )
+ return []
+ return _parse_arch_audit_output(arch_audit_output)
diff --git a/service.py b/service.py
index 4f13ba5..59d8e60 100755
--- a/service.py
+++ b/service.py
@@ -37,12 +37,18 @@ async def checker(check: Callable | Coroutine, interval_secs: int, client: nio.A
async def main():
+ MINUTE = 60
+ HOUR = 60 * MINUTE
+ DAY = 24 * HOUR
+ WEEK = 7 * DAY
+
signal.signal(signal.SIGTERM, stop_gracefully)
client = await alerts.get_client()
checkers = (
- checker(checks.temp_check, 5 * 60, client),
- checker(checks.cpu_check, 5 * 60, client),
- checker(checks.ram_check, 1 * 60, client),
+ checker(checks.temp_check, 5 * MINUTE, client),
+ checker(checks.cpu_check, 5 * MINUTE, client),
+ checker(checks.ram_check, 1 * MINUTE, client),
+ checker(checks.vuln_check, 1 * DAY, client),
)
async with asyncio.TaskGroup() as tg:
checker_tasks: set[asyncio.Task] = set()
diff --git a/tests/test_vuln.py b/tests/test_vuln.py
new file mode 100644
index 0000000..684a322
--- /dev/null
+++ b/tests/test_vuln.py
@@ -0,0 +1,34 @@
+import unittest
+
+from misc import vuln
+
+
+class TestVuln(unittest.TestCase):
+ def test_parse_arch_audit_output(self):
+ self.assertEqual(
+ vuln._parse_arch_audit_output(
+ """[
+ {"name":"AVG-2765",
+ "packages":["openssl"],
+ "status":"Vulnerable",
+ "type":"arbitrary command execution",
+ "severity":"Medium",
+ "fixed":null,
+ "issues":["CVE-2022-2068"]}
+ ]"""
+ ),
+ [
+ vuln.Vulnerability(
+ id="AVG-2765",
+ link="https://security.archlinux.org/AVG-2765",
+ vuln_type="arbitrary command execution",
+ packages=["openssl"],
+ severity=vuln.Severity.MEDIUM,
+ fixed=None,
+ )
+ ],
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()