Source code for snapm.fsdiff.treewalk

# Copyright Red Hat
#
# snapm/fsdiff/treewalk.py - Snapshot Manager fs differ tree walk
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Tree walking support for fsdiff.
"""
from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING
from hashlib import md5, sha1, sha256, sha512
from fnmatch import fnmatch
from pathlib import Path
from datetime import datetime
import itertools
import logging
import errno
import stat
import os

from snapm import SNAPM_SUBSYSTEM_FSDIFF

from snapm.progress import (
    ProgressFactory,
    TermControl,
)

from .filetypes import FileTypeDetector, FileTypeInfo
from .options import DiffOptions

if TYPE_CHECKING:
    from snapm.manager._mounts import Mount

_log = logging.getLogger(__name__)

_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error


[docs]def _log_debug_fsdiff(msg, *args, **kwargs): """A wrapper for fsdiff subsystem debug logs.""" _log.debug(msg, *args, extra={"subsystem": SNAPM_SUBSYSTEM_FSDIFF}, **kwargs)
_HASH_TYPES = { "md5": md5, "sha1": sha1, "sha256": sha256, "sha512": sha512, } #: Paths that are always excluded, no matter what the user says. _ALWAYS_EXCLUDE_PATTERNS = ( # --- /proc (Process Information & Kernel Interfaces) --- "*/proc/kcore", # Virtual alias for physical RAM (Can be Terabytes in size) "*/proc/kmsg", # Blocks indefinitely waiting for kernel log messages "*/proc/*/mem", # Raw memory of processes (Access errors / security alerts) "*/proc/*/pagemap", # Page mappings: blocks on read. "*/proc/*/fd/*", # Recursion hazards: reading your own open file descriptors "*/proc/*/task/*/mem", # Thread specific memory "*/proc/*/clear_refs", # Clear page bits API. "*/proc/sys/fs/binfmt_misc/register", # binfmt registration API "*/proc/sys/net/ipv*/route/flush", # IPv[46] route flush API "*/proc/sysrq-trigger", # Write-only trigger, but unsafe to touch "*/proc/acpi/event", # Deprecated blocking event stream # --- /dev (Device Nodes) --- # Infinite Data Streams (Will hang 'read()' loops or fill buffers) "/dev/zero", "/dev/full", "/dev/random", "/dev/urandom", "/dev/kmsg", # Kernel message stream # Hardware & System Memory (Access violations or bus locking) "/dev/mem", "/dev/kmem", "/dev/port", "/dev/nvram", # Blocking Input/Output Devices (Will hang waiting for user/hardware input) "/dev/console", "/dev/tty*", "/dev/pts/*", "/dev/ptmx", "/dev/input/*", # Keyboards/Mice (Keylogging / blocking) "/dev/uinput", # Watchdogs (Opening/Closing can trigger system reboot!) "/dev/watchdog*", # Tape Drives (Access can trigger physical rewind/seek) "/dev/st*", "/dev/nst*", # --- /sys (Kernel Objects, DebugFS, TraceFS) --- # These contain dynamic infinite streams for kernel debugging "*/sys/kernel/debug/*", # DebugFS: Generally unsafe for automated scanning "*/sys/kernel/tracing/trace_pipe", # TraceFS: Blocks waiting for trace data "*/sys/kernel/tracing/*/*/*_raw", # Tracing snapshots: hangs on read. "*/sys/kernel/tracing/*/*/trace_pipe", # Tracing pipe: hangs on read. "*/sys/fs/cgroup/*", # Control Groups: Complex hierarchy, recursion risks "*/sys/*/power/autosuspend_delay_ms", # IOError on read. "*/sys/devices/pci*/rescan", # PCI rescan API. "*/sys/*/bind", # Driver binding API (write only). "*/sys/*/unbind", # Driver unbinding API (write only). "*/sys/*/uevent", # Driver model uevent channel (write only). ) #: Paths where we should avoid trying to read content in chunks. _NO_CHUNK_PATHS = ( "/proc/sys/fs/binfmt_misc/*", "/proc/sys/net/ipv*/conf/*", "/proc/irq/*/*", "/sys/*", "/dev/*", ) #: Default system directories to exclude _EXCLUDE_SYSTEM_DIRS = ( "/proc/*", "/sys/*", "/dev/*", "/tmp/*", "/run/*", "/var/run/*", "/var/lock/*", )
[docs]def _stat_to_dict(st: os.stat_result) -> Dict[str, Any]: """ Convert an `os.stat_result` into a dictionary. :param st: The stat result to convert. :type st: ``os.stat_result`` :returns: A dictionary mapping stat fields to values. :rtype: ``Dict[str, Any]`` """ return { "st_atime": st.st_atime, "st_atime_ns": st.st_atime_ns, "st_blksize": st.st_blksize, "st_blocks": st.st_blocks, "st_ctime": st.st_ctime, "st_ctime_ns": st.st_ctime_ns, "st_dev": st.st_dev, "st_gid": st.st_gid, "st_ino": st.st_ino, "st_mode": st.st_mode, "st_mtime": st.st_mtime, "st_mtime_ns": st.st_mtime_ns, "st_nlink": st.st_nlink, "st_rdev": st.st_rdev, "st_size": st.st_size, "st_uid": st.st_uid, }
[docs]class FsEntry: """ Representation of a single file system entry for comparison. """
[docs] def __init__( self, path: str, strip_prefix: str, stat_info: os.stat_result, content_hash: Optional[str] = None, file_type_info: Optional[FileTypeInfo] = None, ): """ Initialise a new ``FsEntry`` object. :param path: The full path to this ``FsEntry`` represents. :type path: ``str`` :param strip_prefix: An optional leading path prefix (mount root) to strip from ``path`` before storing. :type strip_prefix: ``str`` :param stat_info: An ``os.stat_result`` for this ``FsEntry``. :type stat_info: ``os.stat_result`` :param content_hash: A hash of the content of this ``FsEntry``. :type content_hash: ``str`` :param file_type_info: A ``FileTypeInfo`` object for this ``FsEntry``. :type file_type_info: ``FileTypeInfo`` """ def _get_xattrs(path, follow_symlinks): """Helper to retrieve all xattrs for ``path``.""" checker = os.path.exists if follow_symlinks else os.path.lexists if not checker(path): return {} try: return { xattr: os.getxattr(path, xattr, follow_symlinks=follow_symlinks) for xattr in os.listxattr(path, follow_symlinks=follow_symlinks) } except OSError: return {} #: The path relative to this tree's mount point self.path: Path = Path(path.removeprefix(strip_prefix)) #: The full path from the host perspective self.full_path: Path = Path(path) #: An ``os.stat_result`` for this path self.stat: os.stat_result = stat_info #: File mode returned by ``stat()`` self.mode: int = stat_info.st_mode #: File size returned by ``stat()`` self.size: int = stat_info.st_size #: File modification time returned by ``stat()`` self.mtime: float = stat_info.st_mtime #: File owner returned by ``stat()`` self.uid: int = stat_info.st_uid #: File group returned by ``stat()`` self.gid: int = stat_info.st_gid #: Calculated content hash or ``None`` if not available. self.content_hash: Optional[str] = content_hash if stat.S_ISLNK(stat_info.st_mode): if os.path.lexists(path): #: The symbolic link target try: self.symlink_target = os.readlink(path) except FileNotFoundError: # Magic kernel links in /proc self.symlink_target = "" self.broken_symlink = True self.xattrs: Dict[str, bytes] = {} self.file_type_info = file_type_info return if os.path.isabs(self.symlink_target): target_for_check = self.symlink_target else: target_for_check = os.path.join( os.path.dirname(path), self.symlink_target ) #: True if symlink target not found self.broken_symlink = not os.path.exists(target_for_check) #: Extended attributes as ``Dict[str, bytes]`` self.xattrs: Dict[str, bytes] = _get_xattrs(path, False) else: #: The symbolic link target self.symlink_target = None #: True if symlink target not found or link vanished self.broken_symlink = True #: Extended attributes as ``Dict[str, bytes]`` self.xattrs = {} else: #: The symbolic link target: ``None`` for non-symlinks self.symlink_target = None #: Not a broken symlink self.broken_symlink = False #: Extended attributes as ``Dict[str, bytes]`` self.xattrs: Dict[str, bytes] = _get_xattrs(path, True) #: An optional instance of ``FileTypeInfo`` describing this path. self.file_type_info = file_type_info
[docs] def __str__(self): """ Return a string representation of this ``FsEntry`` object. :returns: A human readable representation of this ``FsEntry``. :rtype: ``str`` """ indent = 4 * " " stat_str = "\n".join( f"{indent} {k}={v}" for k, v in _stat_to_dict(self.stat).items() ) fse_str = ( f"{indent}path: {self.path}\n" f"{indent}stat:\n{stat_str}\n" f"{indent}hash: {self.content_hash}" ) if self.symlink_target: fse_str += f"\n{indent}symlink_target: {self.symlink_target}" if self.xattrs: fse_str += f"\n{indent}extended_attributes:\n" xattr_strs = [ f"{indent} {xattr}={value}" for xattr, value in self.xattrs.items() ] fse_str += "\n".join(xattr_strs) return fse_str
[docs] def to_dict(self) -> Dict[str, Any]: """ Convert this ``FsEntry`` object into a dictionary representation suitable for encoding as JSON. :returns: A dictionary mapping this instance's keys to values. :rtype: ``Dict[str, Any]`` """ def _decode_xattr_value(value: bytes) -> str: """ Decode extended attribute value to string. Try UTF-8 decode first (most xattrs are text), fall back to hex representation for binary data. :param value: Raw xattr value bytes :type value: ``bytes`` :returns: Decoded string or hex representation """ try: # Try UTF-8 decode, strip trailing nulls (common in SELinux labels) return value.rstrip(b"\x00").decode("utf-8") except UnicodeDecodeError: # Binary data - represent as hex string return value.hex() return { "path": str(self.path), "full_path": str(self.full_path), "stat": _stat_to_dict(self.stat), "mode": self.stat.st_mode, "size": self.stat.st_size, "mtime": self.stat.st_mtime, "uid": self.stat.st_uid, "gid": self.stat.st_gid, "content_hash": self.content_hash, "symlink_target": self.symlink_target, "broken_symlink": self.broken_symlink, "xattrs": {k: _decode_xattr_value(v) for k, v in self.xattrs.items()}, }
@property def is_file(self) -> bool: """ True if this ``FsEntry`` is a regular file. :returns: ``True`` if this ``FsEntry`` corresponds to a regular file or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISREG(self.mode) @property def is_dir(self) -> bool: """ True if this ``FsEntry`` is a directory. :returns: ``True`` if this ``FsEntry`` corresponds to a directory or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISDIR(self.mode) @property def is_symlink(self) -> bool: """ True if this ``FsEntry`` is a symlink. :returns: ``True`` if this ``FsEntry`` corresponds to a symlink or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISLNK(self.mode) @property def is_block(self) -> bool: """ True if this ``FsEntry`` is a block special file. :returns: ``True`` if this ``FsEntry`` corresponds to a block special file or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISBLK(self.mode) @property def is_char(self) -> bool: """ True if this ``FsEntry`` is a character special file. :returns: ``True`` if this ``FsEntry`` corresponds to a character special file or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISCHR(self.mode) @property def is_sock(self) -> bool: """ True if this ``FsEntry`` is a socket. :returns: ``True`` if this ``FsEntry`` corresponds to a socket or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISSOCK(self.mode) @property def is_fifo(self) -> bool: """ True if this ``FsEntry`` is a FIFO special file. :returns: ``True`` if this ``FsEntry`` corresponds to a FIFO special file or ``False`` otherwise. :rtype: ``bool`` """ return stat.S_ISFIFO(self.mode) @property def is_text_like(self) -> bool: """ True if this ``FsEntry`` is a text-like file. :returns: ``True`` if this ``FsEntry`` corresponds to a text-like file or ``False`` otherwise. :rtype: ``bool`` """ if not self.is_file: return False return bool(self.file_type_info and self.file_type_info.is_text_like) @property def type_desc(self): """ Return a string description of the entry type: "file", "dir", or "symlink". :returns: A string description of the entry type. :rtype: ``str`` """ desc = "other" if self.is_file: desc = "file" elif self.is_dir: desc = "directory" elif self.is_symlink: desc = "symbolic link" elif self.is_block: desc = "block device" elif self.is_char: desc = "char device" elif self.is_sock: desc = "socket" elif self.is_fifo: desc = "FIFO" return desc
[docs]class TreeWalker: """ Simple file system tree walker for comparisons. """
[docs] def __init__(self, options, hash_algorithm: str = "sha256"): """ Initialise a new ``TreeWalker`` object. :param options: Options to control this ``TreeWalker`` instance. :type options: ``DiffOptions`` :param hash_algorithm: A string describing the hash algorithm to be used for this ``TreeWalker`` instance. :type hash_algorithm: ``str`` """ if hash_algorithm not in _HASH_TYPES: raise ValueError(f"Unknown hash algorithm: {hash_algorithm}") self.options: DiffOptions = options self.hash_algorithm: str = hash_algorithm self.file_type_detector: FileTypeDetector = FileTypeDetector() if options.exclude_patterns and not options.include_system_dirs: self.exclude_patterns: Tuple[str, ...] = ( options.exclude_patterns + _EXCLUDE_SYSTEM_DIRS ) elif not options.include_system_dirs: self.exclude_patterns: Tuple[str, ...] = _EXCLUDE_SYSTEM_DIRS else: self.exclude_patterns: Tuple[str, ...] = options.exclude_patterns or () self.file_patterns: Optional[Tuple[str, ...]] = options.file_patterns self.hasher = _HASH_TYPES[hash_algorithm]
[docs] def _process_dir( self, dir_path: str, dir_stat: os.stat_result, strip_prefix: str ) -> FsEntry: """ Process a single directory at ``dir_path`` with stat data ``dir_stat``. :param dir_path: The path to examine. :type dir_path: ``str`` :param dir_stat: ``os.stat()`` result for ``dir_path``. :type dir_stat: ``os.stat_result`` :returns: A new ``FsEntry`` representing the directory. :rtype: ``FsEntry`` """ fti = self.file_type_detector.detect_file_type( Path(dir_path), strip_prefix=strip_prefix, use_magic=self.options.use_magic_file_type, ) return FsEntry(dir_path, strip_prefix, dir_stat, file_type_info=fti)
[docs] def _process_file( self, file_path: str, file_stat: os.stat_result, strip_prefix: str ) -> FsEntry: """ Process a single file at ``file_path`` with stat data ``file_stat``. :param file_path: The path to examine. :type file_path: ``str`` :param file_stat: ``os.stat()`` result for ``file_path``. :type file_stat: ``os.stat_result`` :returns: A new ``FsEntry`` representing the file. :rtype: ``FsEntry`` """ fti = self.file_type_detector.detect_file_type( Path(file_path), strip_prefix=strip_prefix, use_magic=self.options.use_magic_file_type, ) content_hash = None if stat.S_ISREG(file_stat.st_mode) and getattr( self.options, "include_content_hash", True ): if file_stat.st_size <= self.options.max_content_hash_size: content_hash = self._calculate_content_hash(file_path, strip_prefix) return FsEntry( file_path, strip_prefix, file_stat, content_hash=content_hash, file_type_info=fti, )
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
[docs] def walk_tree( self, mount: "Mount", strip_prefix: str = "", quiet: bool = False, term_control: Optional[TermControl] = None, ) -> Dict[str, FsEntry]: """ Walk filesystem tree starting from mount.root and return indexed entries. :param mount: A ``snapm.manager._mounts.Mount`` object representing the file system to walk. :type mount: ``snapm.manager._mounts.Mount`` :param strip_prefix: An optional leading path prefix to strip from stored path values. :type strip_prefix: ``str`` :param quiet: Suppress progress output. :type quiet: ``bool`` :param term_control: Optional pre-initialised terminal control object. :type term_control: ``Optional[TermControl]`` :returns: A dictionary mapping path strings to ``FsEntry`` objects. :rtype: ``Dict[str, FsEntry]`` """ if self.options.from_path is not None: if isinstance(self.options.from_path, str): from_path = [self.options.from_path.lstrip(os.sep)] else: from_path = [path.lstrip(os.sep) for path in self.options.from_path] start = [os.path.join(mount.root, path) for path in from_path] else: from_path = [""] start = [mount.root] target_paths = ", ".join(os.sep + path for path in from_path) target = f"{mount.name} {target_paths}" _log_info( "Gathering paths to scan from %s (%s)", mount.name, self.options.from_path ) throbber = ProgressFactory.get_throbber( f"Gathering paths from {target}", style="braillecircle", quiet=quiet, term_control=term_control, no_clear=False, ) def _throb(obj: object) -> object: """ Dummy function to throb the throbber. """ throbber.throb() return obj throbber.start() try: to_visit = start.copy() for start_path in start: to_visit.extend( [ _throb(os.path.join(root, name)) for root, dirs, files in os.walk(start_path) for name in itertools.chain(files, dirs) ] ) except (KeyboardInterrupt, SystemExit): throbber.end("Quit!") raise total = len(to_visit) throbber.end(message=f"found {total} paths") # map of path: FsEntry objects tree = {} progress = ProgressFactory.get_progress( f"Walking {target}", quiet=quiet, term_control=term_control, ) start_time = datetime.now() progress.start(total) follow_symlinks = self.options.follow_symlinks exclude_patterns = _ALWAYS_EXCLUDE_PATTERNS + self.exclude_patterns excluded = 0 try: for i, pathname in enumerate(to_visit): stripped_pathname = pathname.removeprefix(strip_prefix) or "/" if any(fnmatch(stripped_pathname, pat) for pat in exclude_patterns): excluded += 1 continue if self.file_patterns and not any( fnmatch(stripped_pathname, pat) for pat in self.file_patterns ): excluded += 1 continue progress.progress(i, f"Scanning {stripped_pathname}") if follow_symlinks: try: if os.path.exists(pathname): path_stat = os.stat(pathname) else: _log_debug_fsdiff( "Found dangling symbolic link '%s'", pathname ) path_stat = os.lstat(pathname) tree[stripped_pathname] = FsEntry( pathname, strip_prefix, path_stat, ) continue except FileNotFoundError: # Path vanished between discovery and stat; skip it. continue else: try: path_stat = os.lstat(pathname) except FileNotFoundError: # Not a broken symlink: start does not exist in this root. continue exists = os.path.exists(pathname) is_lnk = stat.S_ISLNK(path_stat.st_mode) if not exists and not is_lnk: continue if is_lnk: if not exists: _log_debug_fsdiff("Found dangling symbolic link '%s'", pathname) tree[stripped_pathname] = FsEntry( pathname, strip_prefix, path_stat ) else: tree[stripped_pathname] = self._process_file( pathname, path_stat, strip_prefix ) elif stat.S_ISDIR(path_stat.st_mode): tree[stripped_pathname] = self._process_dir( pathname, path_stat, strip_prefix ) else: tree[stripped_pathname] = self._process_file( pathname, path_stat, strip_prefix ) except (KeyboardInterrupt, SystemExit): progress.cancel("Quit!") raise end_time = datetime.now() progress.end( f"Scanned {total - excluded} paths in {end_time - start_time} (excluded {excluded})" ) return tree
[docs] def _calculate_content_hash(self, file_path: str, strip_prefix: str = "") -> str: """ Calculate content hash for regular files :param file_path: The path to the file to hash. :type file_path: ``str`` :param strip_prefix: An optional leading path prefix to strip from stored path values. :type strip_prefix: ``str`` :returns: A string representation of the hash of the file content using the configured hash algorithm. :rtype: ``str`` """ stripped_file_path = file_path.removeprefix(strip_prefix) or "/" no_chunk = any(fnmatch(stripped_file_path, pat) for pat in _NO_CHUNK_PATHS) hasher = self.hasher(usedforsecurity=False) try: with open(file_path, "rb") as fp: if no_chunk: # Path may not support the size argument to read(): slurp it. try: hasher.update(fp.read()) except OSError as err: _log_debug_fsdiff( "Exception reading %s: %s (from %s)", file_path, err, fp ) return "" else: try: for chunk in iter(lambda: fp.read(65536), b""): hasher.update(chunk) except OSError as err: _log_debug_fsdiff( "Exception reading %s: %s (from %s)", file_path, err, fp ) return "" except PermissionError as perr: _log_debug_fsdiff("Permission denied reading %s: %s", file_path, perr) return "" except ProcessLookupError: # Kernel thread - silently ignored. return "" except OSError as oserr: # Ignore probable zombies. if not ("/proc" in file_path and oserr.errno == errno.EINVAL): _log_debug_fsdiff("Exception opening %s: %s", file_path, oserr) return "" return hasher.hexdigest()