Skip to content

Commit

Permalink
New: encoding passed for file manipulation functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
petersulyok committed Mar 10, 2024
1 parent b437634 commit 902be59
Showing 1 changed file with 29 additions and 21 deletions.
50 changes: 29 additions & 21 deletions src/diskinfo/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Disk:
Based on the specified input parameter the disk will be identified and its attributes will be collected and
stored. A :py:obj:`ValueError` exception will be raised in case of missing or invalid disk identifier.
Encoding parameter will be used for processing disk attribute strings.
Operators (``<``, ``>`` and ``==``) are also implemented for this class to compare different class instances,
they use the disk name for comparison.
Expand All @@ -39,6 +40,7 @@ class Disk:
wwn (str): wwn identifier of the disk
byid_name (str): by-id name of the disk
bypath_name (str): by-path name of the disk
encoding (str): encoding (default is `utf-8`)
Raises:
ValueError: in case of missing or invalid parameters
Expand Down Expand Up @@ -88,11 +90,15 @@ class Disk:
__part_table_type: str # Disk partition table type
__part_table_uuid: str # Disk partition table UUID
__hwmon_path: str # Path for the /sys/HWMON temperature file
__encoding: str # Character encoding

def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str = None,
byid_name: str = None, bypath_name: str = None,) -> None:
byid_name: str = None, bypath_name: str = None, encoding="utf-8") -> None:
"""See class definition docstring above."""

# Save encoding.
self.__encoding = encoding

# Initialization 1: disk name.
if disk_name:
self.__name = disk_name
Expand All @@ -101,9 +107,9 @@ def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str =
elif serial_number:
name = ""
for file in os.listdir("/sys/block/"):
self.__device_id = _read_file("/sys/block/" + file + "/dev")
self.__device_id = _read_file("/sys/block/" + file + "/dev", self.__encoding)
dev_path = "/run/udev/data/b" + self.__device_id
self.__serial_number = _read_udev_property(dev_path, "ID_SERIAL_SHORT=")
self.__serial_number = _read_udev_property(dev_path, "ID_SERIAL_SHORT=", self.__encoding)
if serial_number == self.__serial_number:
name = file
break
Expand All @@ -115,9 +121,9 @@ def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str =
elif wwn:
name = ""
for file in os.listdir("/sys/block/"):
self.__device_id = _read_file("/sys/block/" + file + "/dev")
self.__device_id = _read_file("/sys/block/" + file + "/dev", self.__encoding)
dev_path = "/run/udev/data/b" + self.__device_id
self.__wwn = _read_udev_property(dev_path, "ID_WWN=")
self.__wwn = _read_udev_property(dev_path, "ID_WWN=", self.__encoding)
if wwn in self.__wwn:
name = file
break
Expand Down Expand Up @@ -146,11 +152,13 @@ def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str =
raise ValueError(f"Disk path ({self.__path}) does not exist!")

# Read disk attributes from /sys filesystem.
self.__size = int(_read_file("/sys/block/" + self.__name + "/size"))
self.__model = _read_file("/sys/block/" + self.__name + "/device/model")
self.__device_id = _read_file("/sys/block/" + self.__name + "/dev")
self.__physical_block_size = int(_read_file("/sys/block/" + self.__name + "/queue/physical_block_size"))
self.__logical_block_size = int(_read_file("/sys/block/" + self.__name + "/queue/logical_block_size"))
self.__size = int(_read_file("/sys/block/" + self.__name + "/size", self.__encoding))
self.__model = _read_file("/sys/block/" + self.__name + "/device/model", self.__encoding)
self.__device_id = _read_file("/sys/block/" + self.__name + "/dev", self.__encoding)
self.__physical_block_size = int(_read_file("/sys/block/" + self.__name + "/queue/physical_block_size",
self.__encoding))
self.__logical_block_size = int(_read_file("/sys/block/" + self.__name + "/queue/logical_block_size",
self.__encoding))

# Determination of the disk type (HDD, SSD, NVME or LOOP)
# Type: LOOP
Expand All @@ -164,7 +172,7 @@ def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str =
# Type: SSD or HDD
else:
path = "/sys/block/" + self.__name + "/queue/rotational"
result = _read_file(path)
result = _read_file(path, self.__encoding)
if result == "1":
self.__type = DiskType.HDD
elif result == "0":
Expand All @@ -174,23 +182,23 @@ def __init__(self, disk_name: str = None, serial_number: str = None, wwn: str =

# Read attributes from udev data.
dev_path = "/run/udev/data/b" + self.__device_id
self.__serial_number = _read_udev_property(dev_path, "ID_SERIAL_SHORT=")
self.__firmware = _read_udev_property(dev_path, "ID_REVISION=")
self.__wwn = _read_udev_property(dev_path, "ID_WWN=")
self.__part_table_type = _read_udev_property(dev_path, "ID_PART_TABLE_TYPE=")
self.__part_table_uuid = _read_udev_property(dev_path, "ID_PART_TABLE_UUID=")
model = _read_udev_property(dev_path, "ID_MODEL_ENC=")
self.__serial_number = _read_udev_property(dev_path, "ID_SERIAL_SHORT=", self.__encoding)
self.__firmware = _read_udev_property(dev_path, "ID_REVISION=", self.__encoding)
self.__wwn = _read_udev_property(dev_path, "ID_WWN=", self.__encoding)
self.__part_table_type = _read_udev_property(dev_path, "ID_PART_TABLE_TYPE=", self.__encoding)
self.__part_table_uuid = _read_udev_property(dev_path, "ID_PART_TABLE_UUID=", self.__encoding)
model = _read_udev_property(dev_path, "ID_MODEL_ENC=", self.__encoding)
if model:
self.__model = model

# Read `/dev/disk/by-byid/` path elements from udev and check their existence.
self.__byid_path = _read_udev_path(dev_path, 0)
self.__byid_path = _read_udev_path(dev_path, 0, self.__encoding)
for file_name in self.__byid_path:
if not os.path.exists(file_name):
raise RuntimeError(f"Disk by-id path ({file_name}) does not exist!")

# Read `/dev/disk/by-path/` path elements from udev and check their existence.
self.__bypath_path = _read_udev_path(dev_path, 1)
self.__bypath_path = _read_udev_path(dev_path, 1, self.__encoding)
for file_name in self.__bypath_path:
if not os.path.exists(file_name):
raise RuntimeError(f"Disk by-path path ({file_name}) does not exist!")
Expand Down Expand Up @@ -605,7 +613,7 @@ def get_temperature(self, sudo: bool = False, smartctl_path: str = "/usr/sbin/sm
self.__hwmon_path and \
os.path.exists(self.__hwmon_path):
try:
return float(_read_file(self.__hwmon_path)) / 1000.0
return float(_read_file(self.__hwmon_path, self.__encoding)) / 1000.0
except ValueError:
pass

Expand Down Expand Up @@ -814,7 +822,7 @@ def get_partition_list(self) -> List[Partition]:
path += str(index)
if not os.path.exists(path):
break # If partition path dos not exists.
result.append(Partition(os.path.basename(path), _read_file(path + "/dev")))
result.append(Partition(os.path.basename(path), _read_file(path + "/dev", self.__encoding)))
index += 1
return result

Expand Down

0 comments on commit 902be59

Please sign in to comment.