mirror of
https://forgejo.altau.su/lego/lego-monitoring.git
synced 2026-03-09 20:31:10 +00:00
191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
import json
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from enum import Enum, StrEnum
|
|
from typing import Optional, Self
|
|
|
|
|
|
@dataclass
|
|
class LVAttr:
|
|
"""https://man.archlinux.org/man/lvs.8#NOTES"""
|
|
|
|
class VolType(StrEnum):
|
|
CACHE = "C"
|
|
MIRRORED = "m"
|
|
MIRRORED_NOSYNC = "M"
|
|
ORIGIN = "o"
|
|
ORIGIN_MERGING_SNAPSHOT = "O"
|
|
INTEGRITY = "g"
|
|
RAID = "r"
|
|
RAID_NOSYNC = "R"
|
|
SNAPSHOT = "s"
|
|
MERGING_SNAPSHOT = "S"
|
|
PVMOVE = "p"
|
|
VIRTUAL = "v"
|
|
IMAGE = "i"
|
|
IMAGE_OUT_OF_SYNC = "I"
|
|
MIRROR_LOG = "l"
|
|
CONVERTING = "c"
|
|
THIN = "V"
|
|
THIN_POOL = "t"
|
|
THIN_POOL_DATA = "T"
|
|
VDO_POOL = "d"
|
|
VDO_POOL_DATA = "D"
|
|
METADATA = "e"
|
|
NORMAL = "-"
|
|
|
|
class Permissions(StrEnum):
|
|
WRITABLE = "w"
|
|
READONLY = "r"
|
|
READONLY_ACTIVATED = "R"
|
|
|
|
class AllocationPolicy(StrEnum):
|
|
ANYWHERE = "a"
|
|
ANYWHERE_LOCKED = "A"
|
|
CONTIGUOUS = "c"
|
|
CONTIGUOUS_LOCKED = "C"
|
|
INHERITED = "i"
|
|
INHERITED_LOCKED = "I"
|
|
CLING = "l"
|
|
CLING_LOCKED = "L"
|
|
NORMAL = "n"
|
|
NORMAL_LOCKED = "N"
|
|
|
|
class State(StrEnum):
|
|
ACTIVE = "a"
|
|
HISTORICAL = "h"
|
|
SUSPENDED = "s"
|
|
INVALID_SNAPSHOT = "I"
|
|
INVALID_SUSPENDED_SNAPSHOT = "S"
|
|
SNAPSHOT_MERGE_FAILED = "m"
|
|
SUSPENDED_SNAPSHOT_MERGE_FAILED = "M"
|
|
DEVICE_PRESENT_NO_TABLES = "d"
|
|
DEVICE_PRESENT_INACTIVE_TABLE = "i"
|
|
THIN_POOL_CHECK_NEEDED = "c"
|
|
SUSPENDED_THIN_POOL_CHECK_NEEDED = "C"
|
|
UNKNOWN = "X"
|
|
|
|
class IsOpen(StrEnum):
|
|
OPEN = "o"
|
|
CLOSED = "-"
|
|
UNKNOWN = "X"
|
|
|
|
class TargetType(StrEnum):
|
|
CACHE = "C"
|
|
MIRROR = "m"
|
|
RAID = "r"
|
|
SNAPSHOT = "s"
|
|
THIN = "t"
|
|
UNKNOWN = "u"
|
|
VIRTUAL = "v"
|
|
NORMAL = "-"
|
|
|
|
class Health(StrEnum):
|
|
# for all
|
|
PARTIAL = "p"
|
|
UNKNOWN = "X"
|
|
OK = "-"
|
|
|
|
# for RAID
|
|
REFRESH_NEEDED = "r"
|
|
MISMATCHES = "m"
|
|
WRITEMOSTLY = "w"
|
|
RESHAPING = "s"
|
|
REMOVE = "R"
|
|
|
|
# for thin pools and LVs
|
|
FAILED = "F"
|
|
OUT_OF_SPACE = "D"
|
|
METADATA_READ_ONLY = "M"
|
|
|
|
# for writecache
|
|
ERROR = "E"
|
|
|
|
vol_type: VolType
|
|
permissions: Permissions
|
|
allocation_policy: AllocationPolicy
|
|
fixed_minor: bool
|
|
state: State
|
|
is_open: IsOpen
|
|
target_type: TargetType
|
|
zero_before_use: bool
|
|
health: Health
|
|
skip_activation: bool
|
|
name: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_str(cls, attr_str: str, name: Optional[str] = None) -> Self:
|
|
kwargs = {}
|
|
kwargs["vol_type"] = cls.VolType(attr_str[0])
|
|
kwargs["permissions"] = cls.Permissions(attr_str[1])
|
|
kwargs["allocation_policy"] = cls.AllocationPolicy(attr_str[2])
|
|
kwargs["fixed_minor"] = True if attr_str[3] == "m" else False
|
|
kwargs["state"] = cls.State(attr_str[4])
|
|
kwargs["is_open"] = cls.IsOpen(attr_str[5])
|
|
kwargs["target_type"] = cls.TargetType(attr_str[6])
|
|
kwargs["zero_before_use"] = True if attr_str[7] == "z" else False
|
|
kwargs["health"] = cls.Health(attr_str[8])
|
|
kwargs["skip_activation"] = True if attr_str[9] == "k" else False
|
|
kwargs["name"] = name
|
|
return cls(**kwargs)
|
|
|
|
@classmethod
|
|
def from_cli(cls, name: str) -> Self:
|
|
json_obj = json.loads(subprocess.run(["lvs", "--reportformat=json", name], capture_output=True).stdout)
|
|
attr_str = json_obj["report"][0]["lv"][0]["lv_attr"]
|
|
return cls.from_str(attr_str, name)
|
|
|
|
|
|
class WearoutIndicator(Enum):
|
|
REALLOCATED_SECTORS = 0
|
|
SPARE_BLOCKS = 1
|
|
|
|
|
|
@dataclass
|
|
class WearoutReading:
|
|
indicator: WearoutIndicator
|
|
current_reading: int
|
|
threshold_reading: int
|
|
|
|
|
|
def _get_wearout_reading_from_smartctl_output(smartctl_output: dict) -> WearoutReading:
|
|
disk_protocol = smartctl_output["device"]["protocol"]
|
|
rotation_rate = smartctl_output.get("rotation_rate", 0)
|
|
match rotation_rate:
|
|
case 0: # assuming non-rotating media is an SSD
|
|
indicator = WearoutIndicator.SPARE_BLOCKS
|
|
match disk_protocol:
|
|
case "ATA":
|
|
attr_table = smartctl_output["ata_smart_attributes"]["table"]
|
|
for a in attr_table:
|
|
if a["name"] == "Available_Reservd_Space":
|
|
value = a["value"]
|
|
threshold = a["thresh"]
|
|
break
|
|
else:
|
|
raise Exception(f"no Available_Reservd_Space on ATA SSD")
|
|
case "NVMe":
|
|
value = smartctl_output["nvme_smart_health_information_log"]["available_spare"]
|
|
threshold = smartctl_output["nvme_smart_health_information_log"]["available_spare_threshold"]
|
|
case _:
|
|
indicator = WearoutIndicator.REALLOCATED_SECTORS
|
|
match disk_protocol:
|
|
case "ATA":
|
|
attr_table = smartctl_output["ata_smart_attributes"]["table"]
|
|
for a in attr_table:
|
|
if a["name"] == "Reallocated_Sector_Ct":
|
|
value = a["value"]
|
|
threshold = a["thresh"]
|
|
break
|
|
else:
|
|
raise Exception(f"no Reallocated_Sector_Ct on ATA HDD")
|
|
case "NVMe": # ? NVMe HDDs are very rare, if they even exist
|
|
raise NotImplementedError
|
|
|
|
return WearoutReading(indicator, current_reading=value, threshold_reading=threshold)
|
|
|
|
|
|
def get_wearout_reading(disk: str) -> WearoutReading:
|
|
smartctl_output = json.loads(subprocess.run(["smartctl", "-ja", disk], capture_output=True).stdout.decode("utf-8"))
|
|
wearout_reading = _get_wearout_reading_from_smartctl_output(smartctl_output)
|
|
return wearout_reading
|