Skip to content
Snippets Groups Projects
Verified Commit 1ab210a3 authored by Martin Matějek's avatar Martin Matějek
Browse files

wip: refactor get_drives

parent cb687f8a
No related merge requests found
Pipeline #79150 failed with stage
in 1 minute and 53 seconds
import logging import logging
import os import os
import pathlib
import shlex import shlex
from foris_controller_backends.uci import UciBackend, get_option_named, parse_bool, store_bool from foris_controller_backends.uci import UciBackend, get_option_named, parse_bool, store_bool
...@@ -118,6 +119,9 @@ class SettingsUci(BaseCmdLine, BaseFile): ...@@ -118,6 +119,9 @@ class SettingsUci(BaseCmdLine, BaseFile):
class DriveManager(BaseCmdLine, BaseFile): class DriveManager(BaseCmdLine, BaseFile):
FORMATING_FILE = "/tmp/storage_plugin/formating"
DRIVES_DIR = "/sys/class/block"
def _find_blkid_bin(self) -> str: def _find_blkid_bin(self) -> str:
""" finds fullpath to blkid binary """ """ finds fullpath to blkid binary """
for dirpath in [e for e in os.environ.get("PATH", "").split(":") if e.startswith("/")] + [ for dirpath in [e for e in os.environ.get("PATH", "").split(":") if e.startswith("/")] + [
...@@ -129,30 +133,27 @@ class DriveManager(BaseCmdLine, BaseFile): ...@@ -129,30 +133,27 @@ class DriveManager(BaseCmdLine, BaseFile):
return "/usr/sbin/blkid" # default path return "/usr/sbin/blkid" # default path
def _is_mount_root(self, device: str) -> bool:
""" Check whether device is mounted as rootfs ('/') """
mount = ""
try:
mount = self._read_and_parse(
"/proc/mounts", r"^/dev/{} (/[^ ]*) .*".format(device)
)
except FailedToParseFileContent:
return False
return mount == "/"
def get_drives(self): def get_drives(self):
ret = [] ret = []
# Would block during formating # Would block during formating
if os.path.isfile(inject_file_root("/tmp/storage_plugin/formating")): if pathlib.Path(inject_file_root(self.FORMATING_FILE)).is_file():
return {"drives": ret} return {"drives": ret}
drive_dir = "/sys/class/block" for dev in pathlib.Path(inject_file_root(self.DRIVES_DIR)).glob("sd*"):
# skip device mounted as root
for dev in os.listdir(inject_file_root(drive_dir)): if self._is_mount_root(dev.name):
# skip some device
if not dev.startswith("sd"):
continue
# is device mounted somewhere
mount = ""
try:
mount = self._read_and_parse(
"/proc/mounts", r"^/dev/{} (/[^ ]*) .*".format(dev)
)
except FailedToParseFileContent:
pass
# skip devices mounted as root
if mount == "/":
continue continue
retval, stdout, _ = self._run_command(self._find_blkid_bin(), "/dev/%s" % dev) retval, stdout, _ = self._run_command(self._find_blkid_bin(), "/dev/%s" % dev)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment