Source code for boom._boom

# Copyright Red Hat
#
# boom/_boom.py - Boom package initialisation
#
# SPDX-License-Identifier: Apache-2.0
"""This module provides the declarations, classes, and functions exposed
in the main ``boom`` module. Users of boom should not import this module
directly: it will be imported automatically with the top level module.
"""
from os.path import exists as path_exists, isabs, isdir, join as path_join
from os import listdir
import logging
import string
import errno
from argparse import Namespace
from configparser import ConfigParser
from typing import List, Optional, Set, Tuple, Type, Union, Any

#: The location of the system ``/boot`` directory.
DEFAULT_BOOT_PATH = "/boot"

#: The default path for Boom configuration files.
DEFAULT_BOOM_DIR = "boom"

#: The root directory for Boom configuration files.
DEFAULT_BOOM_PATH = path_join(DEFAULT_BOOT_PATH, DEFAULT_BOOM_DIR)

#: The default directory name for the Boom cache.
DEFAULT_CACHE_DIR = "cache"

#: The path to the root directory of the Boom cache.
DEFAULT_CACHE_PATH = path_join(DEFAULT_BOOM_PATH, DEFAULT_CACHE_DIR)

#: Configuration file mode
BOOT_CONFIG_MODE = 0o644

#: The default configuration file location
BOOM_CONFIG_FILE = "boom.conf"
DEFAULT_BOOM_CONFIG_PATH = path_join(DEFAULT_BOOM_PATH, BOOM_CONFIG_FILE)
__boom_config_path = DEFAULT_BOOM_CONFIG_PATH

#: Kernel version string, in ``uname -r`` format.
FMT_VERSION = "version"
#: LVM2 root logical volume in ``vg/lv`` format.
FMT_LVM_ROOT_LV = "lvm_root_lv"
#: LVM2 kernel command line options
FMT_LVM_ROOT_OPTS = "lvm_root_opts"
#: BTRFS subvolume specification.
FMT_BTRFS_SUBVOLUME = "btrfs_subvolume"
#: BTRFS subvolume ID specification.
FMT_BTRFS_SUBVOL_ID = "btrfs_subvol_id"
#: BTRFS subvolume path specification.
FMT_BTRFS_SUBVOL_PATH = "btrfs_subvol_path"
#: BTRFS kernel command line options
FMT_BTRFS_ROOT_OPTS = "btrfs_root_opts"
#: Stratis pool UUID.
FMT_STRATIS_POOL_UUID = "stratis_pool_uuid"
#: Stratis kernel command line options.
FMT_STRATIS_ROOT_OPTS = "stratis_root_opts"
#: Root device path.
FMT_ROOT_DEVICE = "root_device"
#: Root device options.
FMT_ROOT_OPTS = "root_opts"
#: Linux kernel image
FMT_KERNEL = "kernel"
#: Initramfs image
FMT_INITRAMFS = "initramfs"
#: OS Profile name
FMT_OS_NAME = "os_name"
#: OS Profile short name
FMT_OS_SHORT_NAME = "os_short_name"
#: OS Profile version
FMT_OS_VERSION = "os_version"
#: OS Profile version ID
FMT_OS_VERSION_ID = "os_version_id"

#: List of all possible format keys.
FORMAT_KEYS = [
    FMT_VERSION,
    FMT_LVM_ROOT_LV,
    FMT_LVM_ROOT_OPTS,
    FMT_BTRFS_SUBVOL_ID,
    FMT_BTRFS_SUBVOL_PATH,
    FMT_BTRFS_SUBVOLUME,
    FMT_BTRFS_ROOT_OPTS,
    FMT_STRATIS_POOL_UUID,
    FMT_ROOT_DEVICE,
    FMT_ROOT_OPTS,
    FMT_KERNEL,
    FMT_INITRAMFS,
    FMT_OS_NAME,
    FMT_OS_SHORT_NAME,
    FMT_OS_VERSION,
    FMT_OS_VERSION_ID,
]

#: Root options for Stratis root file systems
ROOT_OPTS_STRATIS = "stratis.rootfs.pool_uuid=%{stratis_pool_uuid}"

# Root options for btrfs volumes

#: Volume specified by subvol path
ROOT_OPTS_BTRFS_PATH = "subvol=%{btrfs_subvol_path}"

#: Volume specified by subvol ID
ROOT_OPTS_BTRFS_ID = "subvolid=%{btrfs_subvol_id}"

#: Minimum width of SHA1-based identifiers
MIN_ID_WIDTH = 7

#
# Logging
#

_log = logging.getLogger("boom")

_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error

# Boom debugging subsystem mask (legacy interface)
BOOM_DEBUG_PROFILE = 1
BOOM_DEBUG_ENTRY = 2
BOOM_DEBUG_REPORT = 4
BOOM_DEBUG_COMMAND = 8
BOOM_DEBUG_CACHE = 16
BOOM_DEBUG_STRATIS = 32
BOOM_DEBUG_ALL = (
    BOOM_DEBUG_PROFILE
    | BOOM_DEBUG_ENTRY
    | BOOM_DEBUG_REPORT
    | BOOM_DEBUG_COMMAND
    | BOOM_DEBUG_CACHE
    | BOOM_DEBUG_STRATIS
)

# Boom debugging subsystem names
BOOM_SUBSYSTEM_PROFILE = "boom.profile"
BOOM_SUBSYSTEM_ENTRY = "boom.entry"
BOOM_SUBSYSTEM_REPORT = "boom.report"
BOOM_SUBSYSTEM_COMMAND = "boom.command"
BOOM_SUBSYSTEM_CACHE = "boom.cache"
BOOM_SUBSYSTEM_STRATIS = "boom.stratis"

_DEBUG_MASK_TO_SUBSYSTEM = {
    BOOM_DEBUG_PROFILE: BOOM_SUBSYSTEM_PROFILE,
    BOOM_DEBUG_ENTRY: BOOM_SUBSYSTEM_ENTRY,
    BOOM_DEBUG_REPORT: BOOM_SUBSYSTEM_REPORT,
    BOOM_DEBUG_COMMAND: BOOM_SUBSYSTEM_COMMAND,
    BOOM_DEBUG_CACHE: BOOM_SUBSYSTEM_CACHE,
    BOOM_DEBUG_STRATIS: BOOM_SUBSYSTEM_STRATIS,
}

_debug_subsystems = set()


[docs] class BoomError(Exception): """Base class of all Boom exceptions.""" pass
[docs] class SubsystemFilter(logging.Filter): """ Filters DEBUG records based on a set of enabled subsystem names. Non-DEBUG records or DEBUG records without a 'subsystem' attribute are always passed through. """
[docs] def __init__(self, name=""): super().__init__(name) self.enabled_subsystems = set(_debug_subsystems)
[docs] def filter(self, record): # Always pass non-DEBUG messages. if record.levelno != logging.DEBUG: return True # Always pass DEBUG messages that aren't for a specific subsystem. if not hasattr(record, "subsystem"): return True # For subsystem-specific DEBUG messages, check if the subsystem is enabled. return record.subsystem in self.enabled_subsystems
[docs] def set_debug_subsystems(self, subsystems): """Sets the collection of subsystems to allow.""" self.enabled_subsystems = set(subsystems)
[docs] def get_debug_mask() -> int: """Return the current debug mask for the ``boom`` package. :returns: The current debug mask value :rtype: int """ enabled_subsystems = set(_debug_subsystems) boom_log = logging.getLogger("boom") for handler in boom_log.handlers: for f in handler.filters: if isinstance(f, SubsystemFilter): enabled_subsystems.update(f.enabled_subsystems) mask_map = {v: k for k, v in _DEBUG_MASK_TO_SUBSYSTEM.items()} mask = 0 for subsystem_name in enabled_subsystems: mask |= mask_map.get(subsystem_name, 0) return mask
[docs] def set_debug_mask(mask): """Set the debug mask for the ``boom`` package. :param mask: the logical OR of the ``BOOM_DEBUG_*`` values to log. :rtype: None """ global _debug_subsystems if mask < 0 or mask > BOOM_DEBUG_ALL: raise ValueError(f"Invalid boom debug mask: {mask}") enabled_subsystems = [] for flag, subsystem_name in _DEBUG_MASK_TO_SUBSYSTEM.items(): if mask & flag: enabled_subsystems.append(subsystem_name) boom_log = logging.getLogger("boom") for handler in boom_log.handlers: for f in handler.filters: if isinstance(f, SubsystemFilter): f.set_debug_subsystems(enabled_subsystems) _debug_subsystems = set(enabled_subsystems)
[docs] class BoomConfig: """Class representing boom persistent configuration values.""" # Initialise members from global defaults boot_path = DEFAULT_BOOT_PATH boom_path = DEFAULT_BOOM_PATH legacy_enable = False legacy_format = "grub1" legacy_sync = True cache_enable = True cache_auto_clean = True cache_path = DEFAULT_CACHE_PATH
[docs] def __str__(self) -> str: """Return a string representation of this ``BoomConfig`` in boom.conf (INI) notation. """ cstr = "" cstr += "[global]\n" cstr += f"boot_root = {self.boot_path}\n" cstr += f"boom_root = {self.boom_path}\n\n" cstr += "[legacy]\n" cstr += f"enable = {self.legacy_enable}\n" cstr += f"format = {self.legacy_format}\n" cstr += f"sync = {self.legacy_sync}\n\n" cstr += "[cache]\n" cstr += f"enable = {self.cache_enable}\n" cstr += f"auto_clean = {self.cache_auto_clean}\n" cstr += f"cache_path = {self.cache_path}\n" return cstr
[docs] def __repr__(self) -> str: """Return a string representation of this ``BoomConfig`` in BoomConfig initialiser notation. """ cstr = ( f'BoomConfig(boot_path="{self.boot_path}", boom_path="{self.boom_path}", ' ) cstr += f'enable_legacy={self.legacy_enable}, legacy_format="{self.legacy_format}", ' cstr += f"legacy_sync={self.legacy_sync}, " cstr += f"cache_enable={self.cache_enable}, " cstr += f"auto_clean={self.cache_auto_clean}, " cstr += f'cache_path="{self.cache_path}")' return cstr
[docs] def __init__( self, boot_path: Optional[str] = None, boom_path: Optional[str] = None, legacy_enable: Optional[bool] = None, legacy_format: Optional[str] = None, legacy_sync: Optional[bool] = None, cache_enable: Optional[bool] = None, cache_auto_clean: Optional[bool] = None, cache_path: Optional[str] = None, ): """Initialise a new ``BoomConfig`` object with the supplied configuration values, or defaults for any unset arguments. :param boot_path: the path to the system /boot volume :param boom_path: the path to the boom configuration dir :param legacy_enable: enable legacy bootloader support :param legacy_format: the legacy bootlodaer format to write :param legacy_sync: the legacy sync mode :param cache_enable: enable boot image cache :param cache_auto_clean: automatically clean up unused boot images :param cache_path: the path to the boot image cache """ self.boot_path = boot_path or self.boot_path self.boom_path = boom_path or self.boom_path self.legacy_enable = legacy_enable or self.legacy_enable self.legacy_format = legacy_format or self.legacy_format self.legacy_sync = legacy_sync or self.legacy_sync self.cache_enable = cache_enable or self.cache_enable self.cache_auto_clean = cache_auto_clean or self.cache_auto_clean self.cache_path = cache_path or self.cache_path self._cfg: Optional[ConfigParser] = None
__config = BoomConfig()
[docs] def set_boom_config(config: BoomConfig): """Set the active configuration to the object ``config`` (which may be any class that includes the ``BoomConfig`` attributes). :param config: a configuration object :returns: None :raises: TypeError if ``config`` does not appear to have the correct attributes. """ global __config def has_value(obj, attr): return hasattr(obj, attr) and getattr(obj, attr) is not None if not (has_value(config, "boot_path") and has_value(config, "boom_path")): raise TypeError("config does not appear to be a BoomConfig object.") __config = config
[docs] def get_boom_config(): """Return the active ``BoomConfig`` object. :rtype: BoomConfig :returns: the active configuration object """ return __config
[docs] def get_boot_path() -> str: """Return the currently configured boot file system path. :returns: the path to the /boot file system. :rtype: str """ return __config.boot_path
[docs] def get_boom_path() -> str: """Return the currently configured boom configuration path. :returns: the path to the BOOT/boom directory. :rtype: str """ return __config.boom_path
[docs] def get_cache_path() -> str: """Return the currently configured cache directory path. :returns: the path to the boom image cache directory. :rtype: str """ return __config.cache_path
[docs] def set_boot_path(boot_path): """Sets the location of the boot file system to ``boot_path``. The path defaults to the '/boot/' mount directory in the root file system: this may be overridden by calling this function with a different path. Calling ``set_boom_root_path()`` will re-set the value returned by ``get_boom_path()`` to the default boom configuration sub- directory within the new boot file system. The location of the boom configuration path may be configured separately by calling ``set_boom_root_path()`` after setting the boot path. :param boot_path: the path to the 'boom/' directory containing boom profiles and configuration. :returns: ``None`` :raises: ValueError if ``boot_path`` does not exist. """ if not isabs(boot_path): raise ValueError(f"boot_path must be an absolute path: {boot_path}") if not path_exists(boot_path): raise ValueError(f"Path '{boot_path}' does not exist") __config.boot_path = boot_path _log_debug("Set boot path to: %s", boot_path) __config.boom_path = path_join(boot_path, DEFAULT_BOOM_DIR) # If a boom/ directory exists at the boot path, automatically set # the boom path to it. Otherwise, we assume that the caller will # set the path explicitly to some non-default location. boom_path = path_join(boot_path, "boom") if path_exists(boom_path) and isdir(boom_path): set_boom_path(path_join(__config.boot_path, "boom"))
[docs] def set_boom_path(boom_path): """Set the location of the boom configuration directory. Set the location of the boom configuration path stored in the active configuration to ``boom_path``. This defaults to the 'boom/' sub-directory in the boot file system specified by ``config.boot_path``: this may be overridden by calling this function with a different path. :param boom_path: the path to the 'boom/' directory containing boom profiles and configuration. :returns: ``None`` :raises: ValueError if ``boom_path`` does not exist. """ err_str = f"Boom path {boom_path} does not exist" if isabs(boom_path) and not path_exists(boom_path): raise ValueError(err_str) if not path_exists(path_join(__config.boot_path, boom_path)): raise ValueError(err_str) if not isabs(boom_path): boom_path = path_join(__config.boot_path, boom_path) if not path_exists(path_join(boom_path, "profiles")): raise ValueError( "Path does not contain a valid boom configuration: " f"{path_join(boom_path, 'profiles')}" ) _log_debug("Set boom path to: %s", boom_path) __config.boom_path = boom_path set_boom_config_path(__config.boom_path) cache_path = path_join(boom_path, "cache") if path_exists(cache_path): set_cache_path(cache_path)
[docs] def set_cache_path(cache_path): """Set the location of the boom image cache directory. Set the location of the boom image cache path stored in the active configuration to ``cache_path``. This defaults to the 'cache/' sub-directory in the boom configuration directory ``config.boom_path``: this may be overridden by calling this function with a different path. :param cache_path: the path to the 'cache/' directory containing cached boot images. :returns: ``None`` :raises: ValueError if ``cache_path`` does not exist. """ err_str = f"Cache path {cache_path} does not exist" if isabs(cache_path) and not path_exists(cache_path): raise ValueError(err_str) if not path_exists(path_join(__config.cache_path, cache_path)): raise ValueError(err_str) if not isabs(cache_path): cache_path = path_join(__config.cache_path, cache_path) __config.cache_path = cache_path _log_debug("Set cache path to: %s", cache_path)
[docs] def get_boom_config_path() -> str: """Return the currently configured boom configuration file path. :rtype: str :returns: the current boom configuration file path """ return __boom_config_path
[docs] def set_boom_config_path(path): """Set the boom configuration file path.""" global __boom_config_path path = path or get_boom_config_path() if not isabs(path): path = path_join(get_boom_path()) if isdir(path): path = path_join(path, BOOM_CONFIG_FILE) __boom_config_path = path _log_debug("set boom_config_path to '%s'", path)
[docs] def parse_btrfs_subvol(subvol: Optional[str]) -> Tuple[Optional[str], Optional[str]]: """Parse a BTRFS subvolume string. Parse a BTRFS subvolume specification into either a subvolume path string, or a string containing a subvolume identifier. :param subvol: The subvolume parameter to parse :returns: A string containing the subvolume path or ID :rtype: ``str`` :raises: ValueError if no valid subvolume was found """ if not subvol: return (None, None) subvol_path = None subvol_id = None if subvol.isnumeric(): subvol_id = str(int(subvol)) else: subvol_path = subvol return (subvol_path, subvol_id)
# # Selection criteria class #
[docs] class Selection: """Selection() Selection criteria for boom BootEntry, OsProfile HostProfile, and BootParams. Selection criteria specified as a simple boolean AND of all criteria with a non-None value. """ # BootEntry fields boot_id = None title = None version = None machine_id = None linux = None initrd = None efi = None options = None devicetree = None # BootParams fields root_device = None lvm_root_lv = None btrfs_subvol_path = None btrfs_subvol_id = None # OsProfile fields os_id = None os_name = None os_short_name = None os_version = None os_version_id = None os_uname_pattern = None os_kernel_pattern = None os_initramfs_pattern = None os_root_opts_lvm2 = None os_root_opts_btrfs = None os_options = None # Should results include the null profile? allow_null_profile = False # HostProfile fields host_id = None host_name = None host_label = None host_short_name = None host_add_opts = None host_del_opts = None # Cache fields path = None orig_path = None img_id = None #: Selection criteria applying to BootEntry objects entry_attrs = [ "boot_id", "title", "version", "machine_id", "linux", "initrd", "efi", "options", "devicetree", "path", ] #: Selection criteria applying to BootParams objects params_attrs = [ "root_device", "lvm_root_lv", "btrfs_subvol_path", "btrfs_subvol_id", ] #: Selection criteria applying to OsProfile objects profile_attrs = [ "os_id", "os_name", "os_short_name", "os_version", "os_version_id", "os_uname_pattern", "os_kernel_pattern", "os_initramfs_pattern", "os_root_opts_lvm2", "os_root_opts_btrfs", "os_options", ] #: Selection criteria applying to HostProfile objects host_attrs = [ "host_id", "host_name", "host_label", "host_short_name", "host_add_opts", "host_del_opts", "machine_id", ] #: Cache selection supports a subset of entry_attrs cache_attrs = [ "version", "linux", "initrd", "path", "orig_path", "timestamp", "img_id", ] all_attrs = entry_attrs + params_attrs + profile_attrs + host_attrs + cache_attrs
[docs] def __str__(self) -> str: """Format this ``Selection`` object as a human readable string. :returns: A human readable string representation of this Selection object :rtype: string """ all_attrs = self.all_attrs attrs = [attr for attr in all_attrs if self.__attr_has_value(attr)] strval = "" tail = ", " for attr in set(attrs): strval += f"{attr}='{getattr(self, attr)}'{tail}" return strval.rstrip(tail)
[docs] def __repr__(self) -> str: """Format this ``Selection`` object as a machine readable string. The returned string may be passed to the Selection initialiser to duplicate the original Selection. :returns: A machine readable string representation of this Selection object :rtype: string """ return "Selection(" + str(self) + ")"
[docs] def __init__( self, boot_id: Optional[str] = None, title: Optional[str] = None, version: Optional[str] = None, machine_id: Optional[str] = None, linux: Optional[str] = None, initrd: Optional[str] = None, efi: Optional[str] = None, root_device: Optional[str] = None, lvm_root_lv: Optional[str] = None, btrfs_subvol_path: Optional[str] = None, btrfs_subvol_id: Optional[str] = None, os_id: Optional[str] = None, os_name: Optional[str] = None, os_short_name: Optional[str] = None, os_version: Optional[str] = None, os_version_id: Optional[str] = None, os_options: Optional[str] = None, os_uname_pattern: Optional[str] = None, os_kernel_pattern: Optional[str] = None, os_initramfs_pattern: Optional[str] = None, allow_null: bool = False, host_id: Optional[str] = None, host_name: Optional[str] = None, host_label: Optional[str] = None, host_short_name: Optional[str] = None, host_add_opts: Optional[str] = None, host_del_opts: Optional[str] = None, path: Optional[str] = None, orig_path: Optional[str] = None, timestamp: None = None, img_id: Optional[str] = None, ): """Initialise a new Selection object. Initialise a new Selection object with the specified selection criteria. :param boot_id: The boot_id to match :param title: The title to match :param version: The version to match :param machine_id: The machine_id to match :param linux: The BootEntry kernel image to match :param initrd: The BootEntry initrd image to match :param efi: The BootEntry efi image to match :param root_device: The root_device to match :param lvm_root_lv: The lvm_root_lv to match :param btrfs_subvol_path: The btrfs_subvol_path to match :param btrfs_subvol_id: The btrfs_subvol_id to match :param os_id: The os_id to match :param os_name: The os_name to match :param os_short_name: The os_short_name to match :param os_version: The os_version to match :param os_version_id: The os_version_id to match :param os_options: The os_options to match :param os_uname_pattern: The os_uname_pattern to match :param os_kernel_pattern: The kernel_pattern to match :param os_initramfs_pattern: The initramfs_pattern to match :param allow_null: Allow selecting the null profile :param host_id: The host identifier to match :param host_name: The host name to match :param host_label: The host label to match :param host_short_name: The host short name to match :param host_add_opts: Host add options to match :param host_del_opts: Host del options to match :param path: An cache image path to match :param orig_path: A cache origin path to match :param timestamp: A cache entry timestamp to match :param img_id: A cache image identifier to match :returns: A new Selection instance :rtype: Selection """ self.boot_id = boot_id self.title = title self.version = version self.machine_id = machine_id self.linux = linux self.initrd = initrd self.efi = efi self.root_device = root_device self.lvm_root_lv = lvm_root_lv self.btrfs_subvol_path = btrfs_subvol_path self.btrfs_subvol_id = btrfs_subvol_id self.os_id = os_id self.os_name = os_name self.os_short_name = os_short_name self.os_version = os_version self.os_version_id = os_version_id self.os_options = os_options self.os_uname_pattern = os_uname_pattern self.os_kernel_pattern = os_kernel_pattern self.os_initramfs_pattern = os_initramfs_pattern self.allow_null_profile = allow_null self.host_id = host_id self.host_name = host_name self.host_label = host_label self.host_short_name = host_short_name self.host_add_opts = host_add_opts self.host_del_opts = host_del_opts self.path = path self.orig_path = orig_path self.timestamp = timestamp self.img_id = img_id
[docs] @classmethod def from_cmd_args(cls, args: Namespace) -> "Selection": """Initialise Selection from command line arguments. Construct a new ``Selection`` object from the command line arguments in ``cmd_args``. Each set selection attribute from ``cmd_args`` is copied into the Selection. The resulting object may be passed to either the ``BootEntry``, ``OsProfile``, or ``HostProfile`` search functions (``find_entries``, ``find_profiles``, and ``find_host_profiles``), as well as the ``boom.command`` calls that accept a selection argument. :param args: The command line selection arguments. :returns: A new Selection instance :rtype: Selection """ (subvol_path, subvol_id) = parse_btrfs_subvol(args.btrfs_subvolume) s = Selection( boot_id=args.boot_id, title=args.title, version=args.version, machine_id=args.machine_id, linux=args.linux, initrd=args.initrd, efi=args.efi, root_device=args.root_device, lvm_root_lv=args.root_lv, btrfs_subvol_path=subvol_path, btrfs_subvol_id=subvol_id, os_id=args.profile, os_name=args.name, os_short_name=args.short_name, os_version=args.os_version, os_version_id=args.os_version_id, os_options=args.os_options, os_uname_pattern=args.uname_pattern, host_id=args.host_id, ) _log_debug("Initialised %s from arguments", repr(s)) return s
def __attr_has_value(self, attr): """Test whether an attribute is defined. Return ``True`` if the specified attribute name is currently defined, or ``False`` otherwise. :param attr: The name of the attribute to test :returns: ``True`` if ``attr`` is set or ``False`` otherwise :rtype: bool """ return hasattr(self, attr) and getattr(self, attr) is not None
[docs] def check_valid_selection( self, entry: bool = False, params: bool = False, profile: bool = False, host: bool = False, cache: bool = False, ): """Check a Selection for valid criteria. Check this ``Selection`` object to ensure it contains only criteria that are valid for the specified object type(s). Returns ``None`` if the object passes the check, or raise ``ValueError`` if invalid criteria exist. :param entry: ``Selection`` may include BootEntry data :param params: ``Selection`` may include BootParams data :param profile: ``Selection`` may include OsProfile data :param host: ``Selection`` may include Host data :param cache: ``Selection`` may include Cache data :returns: ``None`` on success :rtype: ``NoneType`` :raises: ``ValueError`` if excluded criteria are present """ valid_attrs = [] invalid_attrs = [] if entry: valid_attrs += self.entry_attrs if entry or params: valid_attrs += self.params_attrs if profile or host: valid_attrs += self.profile_attrs if host: valid_attrs += self.host_attrs if cache: valid_attrs += self.cache_attrs for attr in self.all_attrs: if self.__attr_has_value(attr) and attr not in valid_attrs: invalid_attrs.append(attr) if invalid_attrs: invalid = ", ".join(invalid_attrs) raise ValueError(f"Invalid criteria for selection type: {invalid}")
[docs] def is_null(self): """Test this Selection object for null selection criteria. Return ``True`` if this ``Selection`` object matches all objects, or ``False`` otherwise. :returns: ``True`` if this Selection is null :rtype: bool """ all_attrs = self.all_attrs attrs = [attr for attr in all_attrs if self.__attr_has_value(attr)] return not any(attrs)
# # Generic routines for parsing name-value pairs. #
[docs] def blank_or_comment(line: str) -> bool: """Test whether line is empty of contains a comment. Test whether the ``line`` argument is either blank, or a whole-line comment. :param line: the line of text to be checked. :returns: ``True`` if the line is blank or a comment, and ``False`` otherwise. :rtype: bool """ return not line.strip() or line.lstrip().startswith("#")
[docs] def parse_name_value( nvp: str, separator: Optional[str] = "=", allow_empty: bool = False ) -> Tuple[str, Optional[str]]: """Parse a name value pair string. Parse a ``name='value'`` style string into its component parts, stripping quotes from the value if necessary, and return the result as a (name, value) tuple. :param nvp: A name value pair optionally with an in-line comment. :param separator: The separator character used in this name value pair, or ``None`` to splir on white space. :returns: A ``(name, value)`` tuple. :rtype: (string, string) tuple. """ val_err = ValueError(f"Malformed name/value pair: {nvp}") try: # Only strip newlines: values may contain embedded # whitespace anywhere within the string. name, value = nvp.rstrip("\n").split(separator, 1) except ValueError: if not allow_empty or not nvp: raise val_err name = nvp.strip(separator) value = None # Value cannot start with '=' if value and value.startswith("="): raise val_err name = name.strip() value = value.lstrip() if value else None if value and "#" in value: value, _ = value.split("#", 1) valid_name_chars = string.ascii_letters + string.digits + "_-,.'\"" bad_chars = [c for c in name if c not in valid_name_chars] if any(bad_chars): raise ValueError(f"Invalid characters in name: {name} ({bad_chars})") if value: if value.startswith('"') or value.startswith("'"): quotes = "\"'" value = value.rstrip(quotes) value = value.lstrip(quotes) return (name, value)
[docs] def find_minimum_sha_prefix(shas: Set[str], min_prefix: int) -> int: """Find the minimum SHA prefix length guaranteeing uniqueness. Find the minimum unique prefix for the set of SHA IDs in the set ``shas``. :param shas: A set of SHA IDs :param min_prefix: Initial minimum prefix value :returns: The minimum unique prefix length for the set :rtype: int """ sha_list = list(shas) sha_list.sort() for sha in sha_list: if sha_list.index(sha) == len(sha_list) - 1: continue def _next_sha(sha_list, sha): return sha_list[sha_list.index(sha) + 1] while sha[:min_prefix] == _next_sha(sha_list, sha)[:min_prefix]: min_prefix += 1 return min_prefix
[docs] def min_id_width(min_prefix: int, objs: List[Any], attr: str) -> int: """Calculate the minimum unique width for id values. Calculate the minimum width to ensure uniqueness when displaying id values. :param min_prefix: The minimum allowed unique prefix. :param objs: An interrable containing objects to check. :param attr: The attribute to compare. :returns: the minimum id width. :rtype: int """ if not objs: return min_prefix ids = set() for obj in objs: ids.add(getattr(obj, attr)) return find_minimum_sha_prefix(ids, min_prefix)
[docs] def load_profiles_for_class( profile_class: Type, profile_type: str, profiles_path: str, profile_ext: str ): """Load profiles from disk. Load the set of profiles found at the path ``profiles_path`` into the list ``profiles``. The list should be cleared before calling this function if the prior contents are no longer required. The profile class to be instantiated is specified by the ``profile_class`` argument. An optional ``type`` may be specified to describe the profile type in error messages. If ``type`` is unset the class name is used instead. This function is intended for use by profile implementations that share common on-disk profile handling. :param profile_class: The profile class to instantiate. :param profile_type: A string description of the profile type. :param profiles_path: Path to the on-disk profile directory. :param profile_ext: Extension of profile files. :returns: None """ profile_files = listdir(profiles_path) _log_debug("Loading %s profiles from %s", profile_type, profiles_path) for pf in profile_files: if not pf.endswith(f".{profile_ext}"): continue pf_path = path_join(profiles_path, pf) try: profile_class(profile_file=pf_path) except Exception as err: _log_warn( "Failed to load %s from '%s': %s", profile_class.__name__, pf_path, err ) if get_debug_mask(): raise continue
__all__ = [ # boom module constants "DEFAULT_BOOT_PATH", "DEFAULT_BOOM_PATH", "BOOT_CONFIG_MODE", "BOOM_CONFIG_FILE", # Profile format keys "FMT_VERSION", "FMT_LVM_ROOT_LV", "FMT_LVM_ROOT_OPTS", "FMT_BTRFS_SUBVOLUME", "FMT_BTRFS_SUBVOL_ID", "FMT_BTRFS_SUBVOL_PATH", "FMT_BTRFS_ROOT_OPTS", "FMT_STRATIS_ROOT_OPTS", "FMT_STRATIS_POOL_UUID", "FMT_ROOT_DEVICE", "FMT_ROOT_OPTS", "FMT_KERNEL", "FMT_INITRAMFS", "FMT_OS_NAME", "FMT_OS_SHORT_NAME", "FMT_OS_VERSION", "FMT_OS_VERSION_ID", "FORMAT_KEYS", # Root fs option templates "ROOT_OPTS_BTRFS_PATH", "ROOT_OPTS_BTRFS_ID", "ROOT_OPTS_STRATIS", # API Classes "BoomConfig", "Selection", # Path configuration "get_boot_path", "get_boom_path", "get_cache_path", "set_boot_path", "set_boom_path", "set_cache_path", "set_boom_config_path", "get_boom_config_path", # Persistent configuration "set_boom_config", "get_boom_config", # boom exception base class "BoomError", # Debug logging - legacy interface "get_debug_mask", "set_debug_mask", "BOOM_DEBUG_PROFILE", "BOOM_DEBUG_ENTRY", "BOOM_DEBUG_REPORT", "BOOM_DEBUG_COMMAND", "BOOM_DEBUG_CACHE", "BOOM_DEBUG_STRATIS", "BOOM_DEBUG_ALL", # Debug logging - subsystem name interface "SubsystemFilter", "BOOM_SUBSYSTEM_PROFILE", "BOOM_SUBSYSTEM_ENTRY", "BOOM_SUBSYSTEM_REPORT", "BOOM_SUBSYSTEM_COMMAND", "BOOM_SUBSYSTEM_CACHE", "BOOM_SUBSYSTEM_STRATIS", # Utility routines "blank_or_comment", "parse_name_value", "parse_btrfs_subvol", "find_minimum_sha_prefix", "min_id_width", "load_profiles_for_class", "MIN_ID_WIDTH", ] # vim: set et ts=4 sw=4