lego-monitoring/archive-arch/misc/disks.py

191 lines
5.9 KiB
Python

import json
import subprocess
from dataclasses import dataclass
from enum import Enum, StrEnum
from typing import Optional, Self
@dataclass
class LVAttr:
"""https://man.archlinux.org/man/lvs.8#NOTES"""
class VolType(StrEnum):
CACHE = "C"
MIRRORED = "m"
MIRRORED_NOSYNC = "M"
ORIGIN = "o"
ORIGIN_MERGING_SNAPSHOT = "O"
INTEGRITY = "g"
RAID = "r"
RAID_NOSYNC = "R"
SNAPSHOT = "s"
MERGING_SNAPSHOT = "S"
PVMOVE = "p"
VIRTUAL = "v"
IMAGE = "i"
IMAGE_OUT_OF_SYNC = "I"
MIRROR_LOG = "l"
CONVERTING = "c"
THIN = "V"
THIN_POOL = "t"
THIN_POOL_DATA = "T"
VDO_POOL = "d"
VDO_POOL_DATA = "D"
METADATA = "e"
NORMAL = "-"
class Permissions(StrEnum):
WRITABLE = "w"
READONLY = "r"
READONLY_ACTIVATED = "R"
class AllocationPolicy(StrEnum):
ANYWHERE = "a"
ANYWHERE_LOCKED = "A"
CONTIGUOUS = "c"
CONTIGUOUS_LOCKED = "C"
INHERITED = "i"
INHERITED_LOCKED = "I"
CLING = "l"
CLING_LOCKED = "L"
NORMAL = "n"
NORMAL_LOCKED = "N"
class State(StrEnum):
ACTIVE = "a"
HISTORICAL = "h"
SUSPENDED = "s"
INVALID_SNAPSHOT = "I"
INVALID_SUSPENDED_SNAPSHOT = "S"
SNAPSHOT_MERGE_FAILED = "m"
SUSPENDED_SNAPSHOT_MERGE_FAILED = "M"
DEVICE_PRESENT_NO_TABLES = "d"
DEVICE_PRESENT_INACTIVE_TABLE = "i"
THIN_POOL_CHECK_NEEDED = "c"
SUSPENDED_THIN_POOL_CHECK_NEEDED = "C"
UNKNOWN = "X"
class IsOpen(StrEnum):
OPEN = "o"
CLOSED = "-"
UNKNOWN = "X"
class TargetType(StrEnum):
CACHE = "C"
MIRROR = "m"
RAID = "r"
SNAPSHOT = "s"
THIN = "t"
UNKNOWN = "u"
VIRTUAL = "v"
NORMAL = "-"
class Health(StrEnum):
# for all
PARTIAL = "p"
UNKNOWN = "X"
OK = "-"
# for RAID
REFRESH_NEEDED = "r"
MISMATCHES = "m"
WRITEMOSTLY = "w"
RESHAPING = "s"
REMOVE = "R"
# for thin pools and LVs
FAILED = "F"
OUT_OF_SPACE = "D"
METADATA_READ_ONLY = "M"
# for writecache
ERROR = "E"
vol_type: VolType
permissions: Permissions
allocation_policy: AllocationPolicy
fixed_minor: bool
state: State
is_open: IsOpen
target_type: TargetType
zero_before_use: bool
health: Health
skip_activation: bool
name: Optional[str] = None
@classmethod
def from_str(cls, attr_str: str, name: Optional[str] = None) -> Self:
kwargs = {}
kwargs["vol_type"] = cls.VolType(attr_str[0])
kwargs["permissions"] = cls.Permissions(attr_str[1])
kwargs["allocation_policy"] = cls.AllocationPolicy(attr_str[2])
kwargs["fixed_minor"] = True if attr_str[3] == "m" else False
kwargs["state"] = cls.State(attr_str[4])
kwargs["is_open"] = cls.IsOpen(attr_str[5])
kwargs["target_type"] = cls.TargetType(attr_str[6])
kwargs["zero_before_use"] = True if attr_str[7] == "z" else False
kwargs["health"] = cls.Health(attr_str[8])
kwargs["skip_activation"] = True if attr_str[9] == "k" else False
kwargs["name"] = name
return cls(**kwargs)
@classmethod
def from_cli(cls, name: str) -> Self:
json_obj = json.loads(subprocess.run(["lvs", "--reportformat=json", name], capture_output=True).stdout)
attr_str = json_obj["report"][0]["lv"][0]["lv_attr"]
return cls.from_str(attr_str, name)
class WearoutIndicator(Enum):
REALLOCATED_SECTORS = 0
SPARE_BLOCKS = 1
@dataclass
class WearoutReading:
indicator: WearoutIndicator
current_reading: int
threshold_reading: int
def _get_wearout_reading_from_smartctl_output(smartctl_output: dict) -> WearoutReading:
disk_protocol = smartctl_output["device"]["protocol"]
rotation_rate = smartctl_output.get("rotation_rate", 0)
match rotation_rate:
case 0: # assuming non-rotating media is an SSD
indicator = WearoutIndicator.SPARE_BLOCKS
match disk_protocol:
case "ATA":
attr_table = smartctl_output["ata_smart_attributes"]["table"]
for a in attr_table:
if a["name"] == "Available_Reservd_Space":
value = a["value"]
threshold = a["thresh"]
break
else:
raise Exception(f"no Available_Reservd_Space on ATA SSD")
case "NVMe":
value = smartctl_output["nvme_smart_health_information_log"]["available_spare"]
threshold = smartctl_output["nvme_smart_health_information_log"]["available_spare_threshold"]
case _:
indicator = WearoutIndicator.REALLOCATED_SECTORS
match disk_protocol:
case "ATA":
attr_table = smartctl_output["ata_smart_attributes"]["table"]
for a in attr_table:
if a["name"] == "Reallocated_Sector_Ct":
value = a["value"]
threshold = a["thresh"]
break
else:
raise Exception(f"no Reallocated_Sector_Ct on ATA HDD")
case "NVMe": # ? NVMe HDDs are very rare, if they even exist
raise NotImplementedError
return WearoutReading(indicator, current_reading=value, threshold_reading=threshold)
def get_wearout_reading(disk: str) -> WearoutReading:
smartctl_output = json.loads(subprocess.run(["smartctl", "-ja", disk], capture_output=True).stdout.decode("utf-8"))
wearout_reading = _get_wearout_reading_from_smartctl_output(smartctl_output)
return wearout_reading