Source code for snapm.manager._mounts

# Copyright Red Hat
#
# snapm/manager/_mounts.py - Snapshot Manager mount support
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Mount integration for snapshot manager
"""
from subprocess import run, CalledProcessError, TimeoutExpired
from typing import Dict, Iterable, List, Optional, Union
from abc import ABC, abstractmethod
import collections
import logging
import shlex
import os.path
import os

from snapm import (
    SNAPM_SUBSYSTEM_MOUNTS,
    SnapmError,
    SnapmNotFoundError,
    SnapmCalloutError,
    SnapmPathError,
    SnapmArgumentError,
    SnapmMountError,
    SnapmUmountError,
    Selection,
    SnapshotSet,
    select_snapshot_set,
    FsTabReader,
    get_device_fstype,
    get_device_path,
    find_snapset_root,
    build_snapset_mount_list,
)

_log = logging.getLogger(__name__)

_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error


def _log_debug_mounts(msg, *args, **kwargs):
    """A wrapper for mounts subsystem debug logs."""
    _log.debug(msg, *args, extra={"subsystem": SNAPM_SUBSYSTEM_MOUNTS}, **kwargs)


#: Path to /proc/self/mounts
PROC_MOUNTS = "/proc/self/mounts"

#: List of API file systems to rbind if present and a mount point.
_BIND_MOUNTS: List[str] = [
    "/dev",
    "/proc",
    "/run",
    "/sys",
]

#: Timeout for mount helper programs
_SNAPM_MOUNT_HELPER_TIMEOUT = int(os.getenv("SNAPM_MOUNT_TIMEOUT", "60"))

#: Path to container overlay bind mount
_CONTAINERS_OVERLAY = "/var/lib/containers/storage/overlay"

#: Taken from kernel source: fs/xfs/libxfs/xfs_log_format.h:
_XFS_UQUOTA_ACCT = 0x0001  # user quota accounting ON
_XFS_UQUOTA_ENFD = 0x0002  # user quota limits enforced
_XFS_GQUOTA_ACCT = 0x0040  # group quota accounting ON
_XFS_GQUOTA_ENFD = 0x0080  # group quota limits enforced
_XFS_PQUOTA_ACCT = 0x0008  # project quota accounting ON
_XFS_PQUOTA_ENFD = 0x0200  # project quota limits enforced

#: Prefix for the output of the "print qflags" command.
_QFLAGS_PREFIX = "qflags = "


def _sanitize_environment():
    """
    Sanitize the environment for callouts.

    :returns: An environment dictionary stripped of locale related keys
              and with "LC_ALL=C".
    :rtype: ``Dict[str, str]``
    """
    _env_filter_keys = (
        "LANG",
        "LC_CTYPE",
        "LC_NUMERIC",
        "LC_TIME",
        "LC_COLLATE",
        "LC_MONETARY",
        "LC_MESSAGES",
        "LC_PAPER",
        "LC_NAME",
        "LC_ADDRESS",
        "LC_TELEPHONE",
        "LC_MEASUREMENT",
        "LC_IDENTIFICATION",
        "LC_ALL",
    )
    return {"LC_ALL": "C"} | {
        k: v for k, v in os.environ.items() if k not in _env_filter_keys
    }


#: Environment for mount callouts
_CALLOUT_ENV = _sanitize_environment()


# pylint: disable=too-many-branches
def _get_xfs_quota_options(devpath: str) -> str:
    """
    Return appropriate XFS quota mount options for the file system contained
    on the block device at `devpath`.

    These must be passed to avoid destroying any existing quota information:

        https://github.com/lvmteam/lvm2/issues/182
        https://github.com/storaged-project/libblockdev/pull/1132

    :param devpath: The path to the block device to examine.
    :returns: A comma-separated string list of mount options.
    :rtype: `str`
    """
    xfs_db_cmd = ["xfs_db", "-r", devpath, "-c", "sb 0", "-c", "print qflags"]
    try:
        result = run(
            xfs_db_cmd,
            env=_CALLOUT_ENV,
            check=True,
            capture_output=True,
            encoding="utf8",
            timeout=_SNAPM_MOUNT_HELPER_TIMEOUT,
        )
    except FileNotFoundError as err:
        raise SnapmCalloutError(
            f"xfs_db not found while examining '{devpath}': {err}"
        ) from err
    except TimeoutExpired as err:
        raise SnapmCalloutError(
            f"Timed out obtaining XFS quota flags for '{devpath}': {err}"
        ) from err
    except CalledProcessError as err:
        raise SnapmCalloutError(
            f"Failed to obtain XFS quota flags value for '{devpath}': {err.stderr}"
        ) from err

    qflags_str = result.stdout.strip()

    def _malformed_qflags():
        return SnapmCalloutError(
            f"Malformed qflags value from xfs_db for '{devpath}': {qflags_str}"
        )

    if not qflags_str.startswith(_QFLAGS_PREFIX):
        raise _malformed_qflags()

    try:
        qflags = int(qflags_str.removeprefix(_QFLAGS_PREFIX), 0)
    except ValueError as err:
        raise _malformed_qflags() from err

    options = []

    if qflags & _XFS_UQUOTA_ACCT:
        if qflags & _XFS_UQUOTA_ENFD:
            options.append("uquota")
        else:
            options.append("uqnoenforce")
    if qflags & _XFS_GQUOTA_ACCT:
        if qflags & _XFS_GQUOTA_ENFD:
            options.append("gquota")
        else:
            options.append("gqnoenforce")
    if qflags & _XFS_PQUOTA_ACCT:
        if qflags & _XFS_PQUOTA_ENFD:
            options.append("pquota")
        else:
            options.append("pqnoenforce")
    return ",".join(options)


def _merge_options(opts_a, opts_b):
    """
    Merge two comma-separated mount options strings.

    :param opts_a: The first set of options.
    :param opts_b: The second set of options.
    :returns: Merged "opts_a,opts_b"
    :rtype: ``str``
    """
    return ",".join(filter(None, [opts_a, opts_b]))


def _get_xfs_options(devpath: str) -> str:
    """
    Return appropriate options for mounting an XFS file system, including
    specifying "nouuid" to avoid UUID clashes, and appropriate quota flags
    for the current file system state.

    :param devpath: The path to the block device to examine.
    :returns: A comma-separated string list of mount options.
    :rtype: `str`
    """

    # Always use "nouuid" when mounting XFS.
    options = "nouuid"

    return _merge_options(options, _get_xfs_quota_options(devpath))


def _resolve_device(device: str) -> str:
    """
    Resolve a device that may be in the form of a LABEL=... or UUID=...
    expression into a device path. If the device is neither a UUID nor
    a label reference it is returned unmodified.

    :param device: The device string to evaluate; '/dev/...', 'LABEL=...',
                   or 'UUID=...'.
    :returns: The device path.
    :rtype: ``str``
    """
    if device.startswith("UUID="):
        uuid = device.split("=", maxsplit=1)[1]
        resolved = get_device_path("uuid", uuid)
        if resolved is None:
            raise SnapmNotFoundError(f"Device with UUID '{uuid}' not found")
        device = resolved
    elif device.startswith("LABEL="):
        label = device.split("=", maxsplit=1)[1]
        resolved = get_device_path("label", label)
        if resolved is None:
            raise SnapmNotFoundError(f"Device with label '{label}' not found")
        device = resolved
    elif device.startswith("PARTUUID="):
        ident = device.split("=", maxsplit=1)[1]
        cand = f"/dev/disk/by-partuuid/{ident}"
        if not os.path.exists(cand):
            raise SnapmNotFoundError(f"Device with PARTUUID '{ident}' not found")
        device = cand
    elif device.startswith("PARTLABEL="):
        ident = device.split("=", maxsplit=1)[1]
        cand = f"/dev/disk/by-partlabel/{ident}"
        if not os.path.exists(cand):
            raise SnapmNotFoundError(f"Device with PARTLABEL '{ident}' not found")
        device = cand
    return device


def _mount(
    what: str,
    where: str,
    fstype: Optional[str] = None,
    options: str = "defaults",
    readonly: bool = False,
    rbind: bool = False,
    rprivate: bool = False,
    unbindable: bool = False,
):
    """
    Call the mount program to mount a file system.

    :param what: The source for the mount operation.
    :param where: The path to the mount point.
    :param fstype: An optional file system type.
    :param options: Options to pass to the mount program.
    :param readonly: Mount the filesystem read-only.
    :param rbind: Perform a recursive bind mount.
    :param rprivate: Make the mount point private recursively.
    :param unbindable: Make the mount point unbindable.
    """
    mount_cmd = ["mount"]

    if readonly:
        if options in ("defaults", ""):
            options = "ro"
        else:
            options = _merge_options(options, "ro")

    what = _resolve_device(what)

    if not rbind and not fstype:
        fstype = get_device_fstype(what)

    if fstype and fstype == "xfs":
        options = ",".join(filter(None, [options, _get_xfs_options(what)]))
    if fstype:
        mount_cmd.extend(["--type", fstype])
    if rbind:
        mount_cmd.append("--rbind")
    if rprivate:
        mount_cmd.append("--make-rprivate")
    if unbindable:
        mount_cmd.append("--make-unbindable")

    mount_cmd.extend(["--options", options, what, where])
    _log_debug_mounts("Calling %s", " ".join(mount_cmd))

    try:
        run(
            mount_cmd,
            env=_CALLOUT_ENV,
            check=True,
            capture_output=True,
            encoding="utf8",
            timeout=_SNAPM_MOUNT_HELPER_TIMEOUT,
        )
    except TimeoutExpired as err:
        raise SnapmCalloutError(
            f"Timed out calling mount for {what} -> {where}: {err}"
        ) from err
    except CalledProcessError as err:
        raise SnapmMountError(what, where, err.returncode, err.stderr.strip()) from err


def _umount(where: str):
    """
    Call the umount program to unmount a file system.

    :param where: The mount point to be unmounted.
    """
    umount_cmd = ["umount", where]
    _log_debug_mounts("Calling %s", " ".join(umount_cmd))
    try:
        run(
            umount_cmd,
            env=_CALLOUT_ENV,
            check=True,
            capture_output=True,
            encoding="utf8",
            timeout=_SNAPM_MOUNT_HELPER_TIMEOUT,
        )
    except TimeoutExpired as err:
        raise SnapmCalloutError(f"Timed out calling umount for {where}: {err}") from err
    except CalledProcessError as err:
        raise SnapmUmountError(where, err.returncode, err.stderr.strip()) from err


class ProcMountsReader:
    """Reader for /proc/mounts format files."""

    # Define a named tuple to give structure to each /proc/mounts entry.
    MountsEntry = collections.namedtuple(
        "MountsEntry", ["what", "where", "fstype", "options", "freq", "passno"]
    )

    def __init__(self, path=PROC_MOUNTS):
        """Initialize with the path to a mounts file.

        :param path: Path to the mounts file (e.g., '/proc/mounts')
        """
        self.path = path

    def submounts(self, root):
        """Iterate over submounts under the given mount point root.

        :param root: The mount point root (e.g., '/run/snapm/mounts/before-upgrade')
        :returns: Yields ``MountsEntry`` objects for submounts under root.
        """
        with open(self.path, "r", encoding="utf8") as fp:
            for line in fp:
                line = line.strip()
                if not line:
                    continue

                parts = line.split()
                if len(parts) == 6:
                    entry = self.MountsEntry(*parts)
                    mount_point = entry.where
                    root_prefix = root.rstrip("/") + "/"
                    if mount_point.startswith(root_prefix):
                        yield entry
                else:
                    _log_warn("Skipping malformed %s line: %s", self.path, line)


class MountBase(ABC):
    """
    An abstract base class representing a file system mount.
    """

    TYPE = "Base"  #: Description of the type of mount object.

    def __init__(self, mount_root: str, name: str):
        """
        Initialise base mount state.

        :param mount_root: The absolute path to the root directory of this
                           mount.
        :type mount_root: ``str``
        :raises SnapmPathError: If mount_root is not a directory.
        """
        if not os.path.isdir(mount_root):
            raise SnapmPathError(f"Mount path {mount_root} is not a directory.")

        self.root: str = mount_root
        self.name: str = name
        self.snapset: Optional[SnapshotSet] = None

    def _check_submounts(
        self,
        mount_points: Iterable[str],
        submounts: List[str],
        # pmr: Optional[ProcMountsReader] = None,
        name_map: Optional[Dict[str, str]] = None,
    ):
        """
        Verify that the expected submounts (both device file systems and API
        file system mounts) are present under ``self.root``.

        :param mount_points: The list of expected device mount points to check.
        :type mount_points: ``Iterable[str]``
        :param submounts: A list of present submounts for this mount to be
                          checked against the expected device and API file
                          system mount points.
        :type submounts: ``List[str]``
        :param name_map: A map of mount points to device names for logging.
        :type name_map: ``Dict[str, str]``
        """
        # Check device and API file system submounts
        checked = {mp: False for mp in mount_points if mp != "/"}
        bind_checked = {mp: False for mp in _BIND_MOUNTS}

        # Don't try to check the overlay bind mount
        checked.pop(_CONTAINERS_OVERLAY, None)

        for abs_mount_path in submounts:
            rel_mount_path = (
                abs_mount_path.removeprefix(self.root)
                if self.root != "/"
                else abs_mount_path
            )
            path_is_mount = os.path.ismount(abs_mount_path)
            if rel_mount_path in checked:
                checked[rel_mount_path] = path_is_mount
            elif rel_mount_path in bind_checked:
                bind_checked[rel_mount_path] = path_is_mount

        if not all(checked.values()):
            _log_warn("Missing snapshot set submounts for %s:", self.name)
            for mount_point in [mp for mp in checked if not checked[mp]]:
                _log_warn(
                    "  Missing: %s (%s)",
                    mount_point,
                    name_map.get(mount_point, "??") if name_map else "??",
                )

        if not all(bind_checked.values()):
            _log_warn("Missing API file system submounts for %s:", self.name)
            for mount_point in [mp for mp in bind_checked if not bind_checked[mp]]:
                _log_warn("  Missing: %s", mount_point)

    @property
    def mounted(self):
        """
        Determine whether this snapshot set mount is currently mounted.
        """
        return os.path.ismount(self.root)

    def mount(self):
        """
        Mount the configured file system and its submounts at `self.root`.
        """
        if self.mounted:
            _log_info("%s %s already mounted at '%s'", self.TYPE, self.name, self.root)
            return
        self._do_mount()

    @abstractmethod
    def _do_mount(self):
        """
        Hook invoked to mount a tree.

        Subclasses implement mount behaviour such as mounting snapshots and
        bind mounting API and runtime file systems.
        """

    def umount(self):
        """
        Unmount the configured file system and its submounts from `self.root`.
        """
        if not self.mounted:
            _log_warn("%s mount at '%s' is not mounted", self.TYPE, self.root)
            return
        self._do_umount()

    @abstractmethod
    def _do_umount(self):
        """
        Hook invoked to unmount a tree.

        Subclasses implement unmount behaviour such as umounting snapshots and
        umounting API and runtime file systems.
        """

    @abstractmethod
    def _chroot(self):
        """
        Hook invoked to obtain an appropriate chroot command list for this
        mount.

        :returns: A command list including an appropriate chroot invocation
                  if necessary.
        :rtype: ``List[str]``
        """

    def exec(self, command: Union[str, Iterable[str]]) -> int:
        """
        Execute a ``command`` chrooted inside this mount namespace.

        We explicitly trust the caller-provided command since snapm requires
        root privileges for all operations.

        :param command: A command and its arguments, suitable for passing to
                        ``shlex.split()``.
        """
        if not self.mounted:
            raise SnapmPathError(
                f"{self.TYPE} {self.name} is not mounted at {self.root}"
            )
        if isinstance(command, str):
            try:
                cmd_args = shlex.split(command)
            except ValueError as err:
                raise SnapmArgumentError(
                    f"Cannot parse command string: {command}"
                ) from err
        else:
            cmd_args = list(command)

        chroot_cmd = list(self._chroot())

        chroot_cmd.extend(cmd_args)
        _log_debug_mounts(
            "Invoking snapshot set chroot command: %s", " ".join(chroot_cmd)
        )
        try:
            status = run(chroot_cmd, check=False)
        except FileNotFoundError as err:  # pragma: no cover
            _log_error("Failed to execute chroot command: %s", err)
            return 127
        return status.returncode


[docs]class Mount(MountBase): """ Representation of a mounted snapshot set, including snapshot set mounts, non-snapshot mounts from /etc/fstab, and bind mounts for API file systems. """ TYPE = "Snapshot set"
[docs] def __init__( self, snapset: SnapshotSet, mount_root: str, discover: bool = False, ): """ Initialize a new Mount instance for the given snapshot set. If ``discover`` is ``True`` validate that the ``mount_root`` is an existing mount point for the snapshot set and check for the presence of expected submounts. :param snapset: The snapshot set to mount or represent. :type snapset: ``SnapshotSet`` :param mount_root: The absolute path to the root directory where the snapshot set will be mounted or is already mounted. :type mount_root: ``str`` :param discover: If True, validate that mount_root is an existing mount point and discover its submounts. If False, prepare a new mount object without validation. :type discover: ``bool`` :raises SnapmPathError: If mount_root is not a directory, or if discover is True and mount_root is not a mount point. """ super().__init__(mount_root, snapset.name) _log_debug("Building snapshot set Mount instance for %s", snapset.name) # The snapshot set that this mount belongs to. self.snapset: Optional[SnapshotSet] = snapset if discover: if not os.path.ismount(mount_root): raise SnapmPathError(f"Mount path {mount_root} is not a mount point.") pmr = ProcMountsReader() submounts = [mnt.where for mnt in pmr.submounts(self.root)] submounts.sort(key=lambda mnt: mnt.count("/"), reverse=True) _log_debug_mounts("Submounts by depth (%s)", self.snapset.name) for submount in submounts: _log_debug_mounts(submount) self.snapset.mount_root = mount_root name_map = { mp: self.snapset.snapshot_by_source(mp).name for mp in self.snapset.mount_points } self._check_submounts( self.snapset.mount_points, submounts, name_map=name_map )
[docs] def _do_mount(self): """ Mount the configured snapshot set and its submounts at `self.root`. """ try: # 0. Initialise FsTabReader fstab = FsTabReader() # 1. Find and mount the snapshot set root filesystem and make it unbindable. root_device = find_snapset_root(self.snapset, fstab) try: root_opts = next(fstab.lookup("where", "/")).options except StopIteration: _log_warn("No entry matching mount point '/' found in fstab") root_opts = "defaults" _mount(root_device, self.root, options=root_opts, unbindable=True) # 2. Build an auxiliary mount list from the SnapshotSet/FsTabReader. mount_list = build_snapset_mount_list(self.snapset, fstab) # 3. Mount the discovered auxiliary mount points. for mount_spec in mount_list: what, where, fstype, options = mount_spec where = os.path.join(self.root, where.lstrip("/")) if not os.path.isdir(where): _log_warn( "Mount point %s does not exist in snapshot set %s, skipping mount", where, self.snapset.name, ) continue _mount(what, where, fstype=fstype, options=options) # 4. Recursively bind mount API file systems from the host. for what in _BIND_MOUNTS: if os.path.exists(what) and os.path.ismount(what): where = os.path.join(self.root, what.lstrip("/")) if not os.path.isdir(where): _log_warn( "Bind mount point %s does not exist in snapshot set %s, skipping mount", where, self.snapset.name, ) continue _mount(what, where, rbind=True, rprivate=True) except (ValueError, SnapmError, StopIteration) as err: # Rollback: discover all submounts and unmount in depth-first order _log_warn( "Mount operation failed for %s at %s: %s. Rolling back.", self.snapset.name, self.root, err, ) pmr = ProcMountsReader() rollback_mounts = [submount.where for submount in pmr.submounts(self.root)] rollback_mounts.append(self.root) # Include root itself rollback_mounts.sort(key=lambda mp: mp.count("/"), reverse=True) for mount_path in rollback_mounts: if not os.path.ismount(mount_path): continue try: _umount(mount_path) except SnapmUmountError as cleanup_err: _log_warn( "Failed to roll back mount '%s': %s", mount_path, cleanup_err ) raise
[docs] def _do_umount(self): """ Unmount the configured snapshot set and its submounts from `self.root`. """ # 1. Discover and unmount each submount under self.root pmr = ProcMountsReader() submounts = [mnt.where for mnt in pmr.submounts(self.root)] submounts.sort(key=lambda mnt: mnt.count("/"), reverse=True) # Work around #586 failed_netns_mounts = [] for submount in submounts: _log_debug_mounts( "Attempting to unmount %s (%s)", submount, self.snapset.name ) try: if submount == os.path.join(self.root, "run") and failed_netns_mounts: for netns_mount in failed_netns_mounts.copy(): try: _umount(netns_mount) failed_netns_mounts.remove(netns_mount) except SnapmUmountError as err: _log_warn( "Unable to unmount %s after retry: %s", netns_mount, err ) _umount(submount) except SnapmUmountError as err: if "/run/netns/netns-" in submount and "not mounted" in err.stderr: failed_netns_mounts.append(submount) else: raise # 1b. Warn if any netns submounts failed to unmount if failed_netns_mounts: _log_warn( "Failed to remove all netns submounts, manual cleanup required: %s", ", ".join(failed_netns_mounts), ) # pragma: no cover # 2. Unmount the root mount at self.root _log_debug_mounts("Attempting to unmount snapshot set root: %s", self.root) _umount(self.root)
[docs] def _chroot(self): """ Return a chrooted command list for this snapshot set mount. :returns: A command list including an appropriate chroot invocation if necessary. :rtype: ``List[str]`` """ return ["chroot", self.root]
[docs]class SysMount(MountBase): """ A class representing the system root file system tree mounted at '/'. """ TYPE = "System root"
[docs] def __init__(self): """ Initialize a new SysMount instance for the root file system. :raises SnapmPathError: If root is not a directory, or not a mount point. This should never happen for the root file system. """ super().__init__("/", "System Root") self.snapset: Optional[SnapshotSet] = None _log_debug("Building SysMount instance") if not os.path.ismount(self.root): raise SnapmPathError(f"Mount path {self.root} is not a mount point.") pmr = ProcMountsReader() submounts = [mnt.where for mnt in pmr.submounts(self.root)] submounts.sort(key=lambda mnt: mnt.count("/"), reverse=True) _log_debug_mounts("Submounts by depth (%s)", self.name) for submount in submounts: _log_debug_mounts(submount) name_map = { mnt.where: os.path.basename(mnt.what) for mnt in pmr.submounts("/") if mnt.what.startswith(os.sep) } mount_points = name_map.keys() self._check_submounts(mount_points, submounts, name_map=name_map)
[docs] def _do_mount(self): """ Dummy template mount method hook for the system root file system. """ # Unreachable sanity check: ``MountBase.mount()`` will already reject # an attempt to mount a mounted file system. raise SnapmPathError("Root file system is already mounted")
[docs] def _do_umount(self): """ Template unmount method hook. Always rejects attempts to unmount the root file system. """ raise SnapmPathError("Root file system cannot be unmounted")
[docs] def _chroot(self): """ Return a non-chrooted command list for the root file system mount. :returns: A command list including an appropriate chroot invocation if necessary. :rtype: ``List[str]`` """ return []
[docs]class Mounts: """ A high-level interface for mounting, unmounting and enumerating snapshot set mounts in the snapm runtime directory (normally /run/snapm/mounts). """
[docs] def __init__(self, manager, mounts_dir: str): """ Initialise a new `Mounts` object. :param manager: The `Manager` object that this instance belongs to. :param mounts_dir: The directory under which mounts are managed. """ self._manager = manager self._root = mounts_dir self._mounts = [] self._mounts_by_name = {} self._sys_mount = SysMount() self.discover_mounts()
[docs] def discover_mounts(self): """ Discover and validate existing snapset mounts in the configured mount path. """ _log_info("Discovering snapshot set mounts under '%s'", self._root) self._mounts.clear() self._mounts_by_name.clear() mounts = [] for dirent in os.listdir(self._root): if dirent not in self._manager.by_name: _log_info("Skipping non-snapshot set path: '%s'", dirent) continue snapset = self._manager.by_name[dirent] try: mount_path = os.path.join(self._root, dirent) mount = Mount(snapset, mount_path, discover=True) mounts.append(mount) snapset.mount_root = mount.root _log_info( "Found snapshot set mount at '%s' (mounted=%s)", mount.root, mount.mounted, ) except SnapmPathError as err: _log_info("Ignoring invalid mount path: '%s' (%s)", mount_path, err) continue self._mounts.extend(mounts) self._mounts_by_name.update({mount.snapset.name: mount for mount in mounts}) _log_info("Found %d snapshot set mounts", len(self._mounts))
[docs] def mount(self, snapset: SnapshotSet) -> Mount: """ Mount the snapshot set `snapset`. :param snapset: The snapshot set to operate on. """ if snapset.name in self._mounts_by_name: existing = self._mounts_by_name[snapset.name] if existing.mounted: _log_info( "Snapshot set %s already mounted at %s", snapset.name, existing.root, ) return existing try: self._mounts.remove(existing) except ValueError: pass self._mounts_by_name.pop(snapset.name, None) # Ensure the snapshot set's volumes are active snapset.activate() mount_path = os.path.join(self._root, snapset.name) os.makedirs(mount_path, exist_ok=True) mount = Mount(snapset, mount_path) try: mount.mount() except (SnapmError, ValueError): try: os.rmdir(mount_path) except OSError: pass # pragma: no cover raise self._mounts.append(mount) self._mounts_by_name[snapset.name] = mount # Set snapset mount_root snapset.mount_root = mount.root return mount
[docs] def umount(self, snapset: SnapshotSet): """ Unmount the snapshot set `snapset`. :param snapset: The snapshot set to operate on. """ # Unmount and clean up root first mount = self._mounts_by_name.get(snapset.name) if not mount: raise SnapmNotFoundError(f"Mount for snapshot set {snapset.name} not found") mount.umount() os.rmdir(mount.root) # Update registries on success self._mounts_by_name.pop(snapset.name, None) try: self._mounts.remove(mount) except ValueError: # pragma: no cover pass # Clear snapset mount_root snapset.mount_root = ""
[docs] def find_mounts(self, selection: Optional[Selection] = None) -> List[Mount]: """ Return a list of `Mount` objects describing the currently mounted snapshot sets managed by this `Mounts` instance. """ selection = selection or Selection() selection.check_valid_selection(snapshot_set=True) return [m for m in self._mounts if select_snapshot_set(selection, m.snapset)]
[docs] def get_sys_mount(self) -> SysMount: """ Return an object representing the system root file system mounts. :returns: A ``SysMount`` instance representing '/'. :rtype: ``SysMount`` """ return self._sys_mount
__all__ = [ "Mount", "Mounts", "SysMount", ]