# Copyright Red Hat
#
# snapm/fsdiff/engine.py - Snapshot Manager fs diff engine
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
File system diff engine
"""
from typing import Any, ClassVar, Dict, Iterator, List, Optional, Tuple
from collections import defaultdict
from datetime import datetime
from math import floor
import logging
import json
import os
import re
from snapm import SNAPM_SUBSYSTEM_FSDIFF
from snapm.progress import ProgressFactory, TermControl
from .changes import ChangeDetector, ChangeType, FileChange
from .contentdiff import ContentDiff, ContentDifferManager
from .difftypes import DiffType
from .options import DiffOptions
from .treewalk import FsEntry
from .tree import DiffTree
_log = logging.getLogger(__name__)
_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
ENGINE_LOG_ME_HARDER = False
[docs]def _log_debug_fsdiff(msg, *args, **kwargs):
"""A wrapper for fsdiff subsystem debug logs."""
_log.debug(msg, *args, extra={"subsystem": SNAPM_SUBSYSTEM_FSDIFF}, **kwargs)
[docs]class FsDiffRecord:
"""
Diff record compatible with snapm.report system
Represents a single file/directory change
"""
[docs] def __init__(
self,
path: str,
diff_type: DiffType,
old_entry: Optional[FsEntry] = None,
new_entry: Optional[FsEntry] = None,
):
"""
Initialise a new ``FsDiffRecord`` object.
:param path: A string describing the path for this diff record.
:type path: ``str``
:param diff_type: The diff type for this diff record.
:type diff_type: ``DiffType``
:param old_entry: The original entry for the comparison.
:param new_entry: The updated entry for the comparison.
"""
self.path = path
self.diff_type = diff_type
self.old_entry = old_entry
self.new_entry = new_entry
self.changes = [] # List of FileChange objects
self.content_diff = None # ContentDiff object
self.moved_from = None # For move operations
self.moved_to = None
# Fields for reporting (compatible with snapm.report)
self.file_path = path
self.change_type = diff_type
self.file_type = self._get_file_type()
self.file_type_desc = self._get_file_type_desc()
self.file_category = self._get_file_category()
self.size_old = old_entry.size if old_entry else 0
self.size_new = new_entry.size if new_entry else 0
self.size_delta = self.size_new - self.size_old
self.mode_old = oct(old_entry.mode) if old_entry else None
self.mode_new = oct(new_entry.mode) if new_entry else None
self.owner_old = f"{old_entry.uid}:{old_entry.gid}" if old_entry else None
self.owner_new = f"{new_entry.uid}:{new_entry.gid}" if new_entry else None
self.mtime_old = old_entry.mtime if old_entry else None
self.mtime_new = new_entry.mtime if new_entry else None
self.content_changed = False
self.metadata_changed = False
self.has_content_diff = False
self.content_diff_summary = ""
[docs] def __str__(self) -> str:
"""
Return a string representation of this ``FsDiffRecord`` object.
:returns: A human readable representation of this ``FsDiffRecord``.
:rtype: ``str``
"""
def _format_mtime(mtime: Optional[float]) -> str:
"""
Format a (possibly ``None``) mtime value.
:param mtime: The mtime to format.
:type mtime: ``Optional[float]``
:returns: The formatted value or the empty string.
:rtype: ``str``
"""
return f"{'(' + str(mtime) + ')' if mtime is not None else ''}"
# Possibly empty/missing fields
nl = "\n"
changes = (
(" changes:\n" + f"{nl.join(str(chg) for chg in self.changes)}\n")
if self.changes
else ""
)
moved_from = f"moved_from: {self.moved_from}\n" if self.moved_from else ""
moved_to = f"moved_to: {self.moved_to}\n" if self.moved_to else ""
content_diff = (
(" content_diff:\n" + f"{self.content_diff}\n")
if self.content_diff is not None
else ""
)
content_diff_summary = (
f"\n content_diff_summary: {self.content_diff_summary}"
if self.content_diff_summary
else ""
)
# Pre-formatted fields
mtime_old = (
str(datetime.fromtimestamp(self.mtime_old)) + " "
if self.mtime_old is not None
else ""
)
mtime_new = (
str(datetime.fromtimestamp(self.mtime_new)) + " "
if self.mtime_new is not None
else ""
)
fsd_str = (
f"Path: {self.path}\n"
f" diff_type: {self.diff_type.value}\n"
f" old_entry:{nl + str(self.old_entry) if self.old_entry else ''}\n"
f" new_entry:{nl + str(self.new_entry) if self.new_entry else ''}\n"
f"{changes}" # no newline (embedded if set)
f"{content_diff}" # no newline (embedded if set)
f"{' ' + moved_from if moved_from else ''}" # no newline (embedded if set)
f"{' ' + moved_to if moved_to else ''}" # no newline (embedded if set)
f" file_path: {self.file_path}\n"
f" file_type: {self.file_type}\n"
f" file_type_desc: {self.file_type_desc}\n"
f" file_category: {self.file_category}\n"
f" size_old: {self.size_old}\n"
f" size_new: {self.size_new}\n"
f" size_delta: {self.size_delta}\n"
f" mode_old: {self.mode_old if self.mode_old else ''}\n"
f" mode_new: {self.mode_new if self.mode_new else ''}\n"
f" owner_old: {self.owner_old if self.owner_old else ''}\n"
f" owner_new: {self.owner_new if self.owner_new else ''}\n"
f" mtime_old: {mtime_old}{_format_mtime(self.mtime_old)}\n"
f" mtime_new: {mtime_new}{_format_mtime(self.mtime_new)}\n"
f" content_changed: {self.content_changed}\n"
f" metadata_changed: {self.metadata_changed}\n"
f" has_content_diff: {self.has_content_diff}" # no newline: end
f"{content_diff_summary}" # no newline (prefixed if set)
)
return fsd_str
[docs] def to_dict(self) -> Dict[str, Any]:
"""
Convert this ``FsDiffRecord`` object into a dictionary representation
suitable for encoding as JSON.
:returns: A dictionary mapping this instance's keys to values.
:rtype: ``Dict[str, Any]``
"""
out = {
"path": self.path,
"diff_type": self.diff_type.value,
"file_path": self.file_path,
"file_type": self.file_type,
"file_type_desc": self.file_type_desc,
"file_category": self.file_category,
"size_old": self.size_old,
"size_new": self.size_new,
"size_delta": self.size_delta,
"mode_old": self.mode_old or "",
"mode_new": self.mode_new or "",
"owner_old": self.owner_old or "",
"owner_new": self.owner_new or "",
"mtime_old": self.mtime_old,
"mtime_new": self.mtime_new,
"content_changed": self.content_changed,
"metadata_changed": self.metadata_changed,
"has_content_diff": self.has_content_diff,
}
if self.old_entry:
out["old_entry"] = self.old_entry.to_dict()
if self.new_entry:
out["new_entry"] = self.new_entry.to_dict()
if self.changes:
out["changes"] = [change.to_dict() for change in self.changes]
if self.content_diff is not None:
out["content_diff"] = self.content_diff.to_dict()
if self.moved_from:
out["moved_from"] = self.moved_from
if self.moved_to:
out["moved_to"] = self.moved_to
if self.content_diff_summary:
out["content_diff_summary"] = self.content_diff_summary
return out
[docs] def json(self, pretty=False) -> str:
"""
Return a string representation of this ``FsDiffRecord`` in JSON
notation.
:param pretty: Indent JSON to be human readable.
:type pretty: ``bool``
:returns: A JSON representation of this instance.
:rtype: ``str``
"""
return json.dumps(self.to_dict(), indent=4 if pretty else None)
[docs] def _get_file_type(self) -> str:
"""
Get file type for reporting.
:returns: A description of the file type for this diff record.
:rtype: ``str``
"""
entry = self.new_entry or self.old_entry
if not entry:
return "unknown"
if entry.is_dir:
return "directory"
if entry.is_symlink:
return "symlink"
if entry.file_type_info:
return entry.file_type_info.mime_type
return "file"
[docs] def _get_file_type_desc(self) -> str:
"""
Get file type description for reporting.
:returns: A detailed description of the file type for this diff record.
:rtype: ``str``
"""
entry = self.new_entry or self.old_entry
if not entry:
return "unknown"
if entry.is_dir:
return "filesystem directory"
if entry.is_symlink:
return "symbolic link"
if entry.file_type_info:
return entry.file_type_info.description
return "unknown"
[docs] def _get_file_category(self) -> str:
"""
Get file category for reporting
:returns: The file category for this diff record.
:rtype: ``str``
"""
entry = self.new_entry or self.old_entry
if not entry or not entry.file_type_info:
return "unknown"
return entry.file_type_info.category.value
[docs] def add_change(self, change: "FileChange"):
"""
Add a detected change.
:param change: The file change to record.
:type change: ``FileChange``
"""
self.changes.append(change)
if change.change_type == ChangeType.CONTENT:
self.content_changed = True
else:
self.metadata_changed = True
[docs] def set_content_diff(self, content_diff: ContentDiff):
"""
Set content-level diff.
:param content_diff: The content diff for this diff record.
:type content_diff: ``ContentDiff``
"""
self.content_diff = content_diff
self.has_content_diff = True
self.content_diff_summary = content_diff.summary
[docs] def get_change_summary(self) -> str:
"""
Get human-readable change summary
"""
summary = "Modified"
if self.diff_type == DiffType.ADDED:
summary = f"Added {self.file_type}"
elif self.diff_type == DiffType.REMOVED:
summary = f"Removed {self.file_type}"
elif self.diff_type == DiffType.MOVED:
summary = f"Moved from {self.moved_from} to {self.moved_to}"
elif self.diff_type == DiffType.TYPE_CHANGED:
if not self.old_entry or not self.new_entry:
summary = "Type changed"
else:
summary = (
f"Type changed from {self.old_entry.type_desc} to "
f"{self.new_entry.type_desc}"
)
elif self.changes:
change_types = [c.change_type.value for c in self.changes]
summary = f"Changed: {', '.join(set(change_types))}"
return summary
[docs]def render_unified_diff(record: FsDiffRecord, tc: Optional[TermControl]) -> str:
"""
Render a unified diff for a modified file.
:param record: The diff record to render.
:type record: ``FsDiffRecord``
:param tc: An optional ``TermControl`` instance to use for rendering color
output.
:type tc: ``Optional[TermControl]``
:returns: Rendered unified diff string.
:rtype: ``str``
"""
def _format_timestamp(timestamp: Optional[float]) -> str:
"""
Format human-readable timestamp.
:param timestamp: A UNIX epoch timestamp.
:type timestamp: ``Optional[float]``
:returns: Human readable datetime string.
:rtype: ``str``
"""
return str(datetime.fromtimestamp(timestamp)) if timestamp is not None else ""
def _hunk_header(header: str) -> str:
"""
Format a chunk header with colored output.
:param header: The header to format.
:type header: ``str``
:returns: Colorized header string.
:rtype: ``str``
"""
before, sep, after = header.partition(" @@")
if not sep: # No closing @@
return header
return tc.CYAN + before + " @@" + tc.NORMAL + after.strip()
if not record.has_content_diff:
return ""
if record.content_diff.diff_type not in ("unified", "json"):
return ""
added = record.new_entry and not record.old_entry
deleted = record.old_entry and not record.new_entry
from_path = f"a{record.file_path}"
to_path = f"b{record.file_path}"
header_lines = [f"diff {from_path} {to_path}"]
if added:
header_lines.append(f"new file mode {record.mode_new}")
if deleted:
header_lines.append(f"deleted file mode {record.mode_old}")
from_path = from_path if not added else "/dev/null"
to_path = to_path if not deleted else "/dev/null"
lines = [
*header_lines,
f"--- {from_path}\t{_format_timestamp(record.mtime_old)}",
f"+++ {to_path}\t{_format_timestamp(record.mtime_new)}",
]
def _rstrip_nl(s: str) -> str:
return s[:-1] if s.endswith("\n") else s
if not record.content_diff.diff_data or len(record.content_diff.diff_data) < 2:
return ""
if tc:
diff_lines = [
(
(tc.RED + _rstrip_nl(line) + tc.WHITE)
if line.startswith("-")
else (
(tc.GREEN + _rstrip_nl(line) + tc.WHITE)
if line.startswith("+")
else (
_hunk_header(line)
if line.startswith("@@")
else _rstrip_nl(line)
)
)
)
for line in record.content_diff.diff_data[2:]
]
else:
diff_lines = [_rstrip_nl(line) for line in record.content_diff.diff_data[2:]]
lines.extend(diff_lines)
# Preserve content exactly; we already stripped only trailing newlines
# from content-diff input via _rstrip_nl().
return "\n".join(lines)
[docs]def render_diff_stat(records: List[FsDiffRecord], term_control: TermControl) -> str:
"""
Render a diffstat-style summary for the given records.
:param records: Diff records with content diffs.
:type records: ``FsDiffRecord``
:param term_control: A ``TermControl`` instance to use for rendering color
output.
:type term_control: ``TermControl``
:returns: Diffstat-style summary string.
:rtype: ``str``
"""
# Filter to only records with line-based diff data
records = [r for r in records if r.content_diff and r.content_diff.diff_data]
if not records:
return ""
adds = 0
dels = 0
def _render_one(record: FsDiffRecord):
"""
:param record: The diff record to render.
:type record: ``FsDiffRecord``
:returns: A diffstat for ``record``.
:rtype: ``str``
"""
nonlocal adds, dels
this_path = record.path.lstrip(os.sep)
pad = path_width - len(this_path)
header = f" {this_path}{pad * ' '} | "
added = len(
[
data
for data in record.content_diff.diff_data
if (data.startswith("+") and not data.startswith("+++"))
]
)
removed = len(
[
data
for data in record.content_diff.diff_data
if (data.startswith("-") and not data.startswith("---"))
]
)
adds += added
dels += removed
plus = f"{term_control.GREEN}+{term_control.NORMAL}"
minus = f"{term_control.RED}-{term_control.NORMAL}"
return header + f"{added + removed:4} {plus * added}{minus * removed}"
path_width = max(len(record.path) for record in records) - 1
count = len(records)
diffstat = "\n".join(_render_one(record) for record in records)
trailer = (
f"\n {count} file{'s' if count > 1 else ''} changed, "
f"{adds} insertions(+), {dels} deletions(-)"
)
return diffstat + trailer
[docs]class FsDiffResults:
"""Container for filesystem diff results with formatting methods."""
#: Constant for the names of the string diff formats
DIFF_FORMATS: ClassVar[List[str]] = [
"paths",
"full",
"short",
"json",
"diff",
"summary",
"tree",
]
[docs] def __init__(
self,
records: List[FsDiffRecord],
options: DiffOptions,
timestamp: int,
count: int = 0,
):
self._records = records
self.options = options
self.timestamp = timestamp
self.count = count or len(records)
[docs] def __repr__(self) -> str:
"""
Return a machine-readable representation of this instance.
:returns: ``FsDiffResults`` constructor style string.
:rtype: ``str``
"""
return f"FsDiffResults([...], {self.options!r}, {self.timestamp})"
# List-like interface
def __iter__(self) -> Iterator[FsDiffRecord]:
"""
Implement iter(self).
"""
return iter(self._records)
def __len__(self):
"""
Implement len(self).
"""
return len(self._records)
def __getitem__(self, index: int) -> FsDiffRecord:
"""
Return self[index]
:param index: The index to return.
:type index: ``int``
"""
return self._records[index]
# Summary properties
@property
def total_changes(self) -> int:
"""
Return the total number of changes in this ``FsDiffResults`` instance:
equivalent to ``len(self)``.
:returns: Count of changes.
:rtype: ``int``
"""
return len(self)
@property
def content_changes(self) -> int:
"""
Return the number of content changes in this ``FsDiffResults`` instance.
:returns: Count of changes with content diff.
:rtype: ``int``
"""
return len([r for r in self._records if r.has_content_diff])
@property
def added(self) -> List[FsDiffRecord]:
"""
Return added changes in this ``FsDiffResults`` instance.
:returns: Changes with ``DiffType.ADDED`` type.
:rtype: ``List[FsDiffRecord]``
"""
return [r for r in self._records if r.diff_type == DiffType.ADDED]
@property
def removed(self) -> List[FsDiffRecord]:
"""
Return removed changes in this ``FsDiffResults`` instance.
:returns: Changes with ``DiffType.REMOVED`` type.
:rtype: ``List[FsDiffRecord]``
"""
return [r for r in self._records if r.diff_type == DiffType.REMOVED]
@property
def modified(self) -> List[FsDiffRecord]:
"""
Return modified changes in this ``FsDiffResults`` instance.
:returns: Changes with ``DiffType.MODIFIED`` type.
:rtype: ``List[FsDiffRecord]``
"""
return [r for r in self._records if r.diff_type == DiffType.MODIFIED]
@property
def moved(self) -> List[FsDiffRecord]:
"""
Return moved changes in this ``FsDiffResults`` instance.
:returns: Changes with ``DiffType.MOVED`` type.
:rtype: ``List[FsDiffRecord]``
"""
return [r for r in self._records if r.diff_type == DiffType.MOVED]
@property
def type_changed(self) -> List[FsDiffRecord]:
"""
Return type_changed changes in this ``FsDiffResults`` instance.
:returns: Changes with ``DiffType.TYPE_CHANGED`` type.
:rtype: ``List[FsDiffRecord]``
"""
return [r for r in self._records if r.diff_type == DiffType.TYPE_CHANGED]
# Output formats
[docs] def paths(self) -> List[str]:
"""
Return a list of paths that changed in this ``FsDiffResults``.
:returns: Path list.
:rtype: ``List[str]``
"""
return [record.path for record in self._records]
[docs] def full(self) -> str:
"""
Return a string with full ``FsDiffRecord`` content for this instance.
:returns: String description of file system changes.
:rtype: ``str``
"""
return "\n\n".join(str(record) for record in self._records)
[docs] def short(self) -> str:
"""
Return brief summary of ``FsDiffRecord`` content for this instance.
:returns: Brief string description of file system changes.
:rtype: ``str``
"""
first = True
out = ""
for record in self._records:
summary = (
f"\n content_diff_summary: {record.content_diff_summary}"
if record.content_diff_summary
else ""
)
change_descs = ", ".join(chg.description for chg in record.changes)
description = f"\n changes: {change_descs}" if change_descs else ""
sep = "" if first else "\n"
out += (
f"{sep}"
f"Path: {record.path}\n"
f" diff_type: {record.diff_type.value}\n"
f" file_type: {record.file_type}\n"
f" file_type_desc: {record.file_type_desc}"
f"{description}{summary}"
)
first = False
return out
[docs] def json(self, pretty: bool = False) -> str:
"""
Return JSON representation of ``FsDiffRecord`` content for this
instance.
:returns: JSON string description of file system changes.
:rtype: ``str``
"""
dicts = [record.to_dict() for record in self._records]
return json.dumps(dicts, indent=4 if pretty else None)
[docs] def diff(
self,
diffstat: bool = False,
color: str = "auto",
term_control: Optional[TermControl] = None,
) -> str:
"""
Return unified diff representation of content changes for this
instance.
:param diffstat: Include "diffstat"-like change summary.
:type diffstat: ``bool``
:param color: A string to control color diff rendering: "auto",
"always", or "never".
:type color: ``str``
:param term_control: An optional ``TermControl`` instance to use for
formatting. The supplied instance overrides any
``color`` argument if set.
:type term_control: ``Optional[TermControl]``
:returns: unified diff string description of file system changes.
:rtype: ``str``
"""
term_control = term_control or TermControl(color=color)
content_diffs = [r for r in self._records if r.has_content_diff]
diffs = [
rendered
for r in content_diffs
if (rendered := render_unified_diff(r, term_control))
]
stat_str = ""
if diffstat and content_diffs:
stat_str = render_diff_stat(content_diffs, term_control) + "\n\n"
return stat_str + "\n".join(diffs)
[docs] def summary(
self,
diffstat: bool = False,
color: str = "auto",
term_control: Optional[TermControl] = None,
) -> str:
"""
Return a summary of this ``FsDiffResults`` instance.
:param diffstat: Include "diffstat"-like change summary.
:type diffstat: ``bool``
:param color: A string to control color rendering: "auto", "always", or
"never".
:type color: ``str``
:param term_control: An optional ``TermControl`` instance to use for
formatting. The supplied instance overrides any
``color`` argument if set.
:type term_control: ``Optional[TermControl]``
:returns: A string summarizing this instance.
:rtype: ``str``
"""
tc = term_control or TermControl(color=color)
content_diffs = [r for r in self._records if r.has_content_diff]
summary = (
f"Total changes: {len(self)}\n"
f" Paths {tc.GREEN + 'added: ' + tc.NORMAL} {len(self.added)}\n"
f" Paths {tc.RED + 'removed: ' + tc.NORMAL} {len(self.removed)}\n"
f" Paths {tc.YELLOW + 'modified: ' + tc.NORMAL} {len(self.modified)}\n"
f" Paths {tc.MAGENTA + 'withdiff: ' + tc.NORMAL} {len(content_diffs)}\n"
f" Paths {tc.CYAN + 'moved: ' + tc.NORMAL} {len(self.moved)}\n"
f" Paths {tc.BLUE + 'different:' + tc.NORMAL} {len(self.type_changed)}"
)
stat_str = ""
if diffstat and content_diffs:
stat_str = "\n\n" + render_diff_stat(content_diffs, tc)
return summary + stat_str
[docs] def tree(
self,
color: str = "auto",
desc: str = "none",
term_control: Optional[TermControl] = None,
) -> str:
"""
Render a ``DiffTree`` of this ``FsDiffResults`` instance.
:param color: A string to control color tree rendering: "auto",
"always", or "never".
:type color: ``str``
:param desc: Include descriptions: "short" for brief description, "full"
for complete change descriptions or "none" to omit.
:type desc: ``Optional[str]``
:param term_control: An optional ``TermControl`` instance to use for
formatting. The supplied instance overrides any
``color`` argument if set.
:type term_control: ``Optional[TermControl]``
:returns: A string representation of a difference tree
:rtype: ``str``
"""
tree = DiffTree.build_tree(
self, color=color, quiet=self.options.quiet, term_control=term_control
)
return tree.render(desc=desc)
[docs]class DiffEngine:
"""
Core class for generating fsdiff comparisons.
"""
[docs] def __init__(self):
"""
Initialise a new ``DiffEngine`` instance.
"""
self.change_detector = ChangeDetector()
self.content_differ = ContentDifferManager()
[docs] def _effective_changes(
self, changes: List[FileChange], options: DiffOptions
) -> List[FileChange]:
"""
Elide ignored change types in ``changes``.
:param changes: The list of changes to examine.
:type changes: ``List[FileChange]``
:param options: Effective diff options.
:type options: ``DiffOptions``
:returns: Changes pruned according to options.
:rtype: ``List[FileChange]``
"""
return (
[c for c in changes if c.change_type == ChangeType.CONTENT]
if options.content_only
else changes
)
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
# pylint: disable=too-many-nested-blocks
[docs] def compute_diff(
self,
tree_a: Dict[str, FsEntry],
tree_b: Dict[str, FsEntry],
options: "DiffOptions" = None,
term_control: Optional[TermControl] = None,
) -> FsDiffResults:
"""
Main diff computation logic.
:param tree_a: The first file system tree to compare: a dictionary of
path name -> file system entry mappings.
:type tree_a: ``Dict[str, FsEntry]``
:param tree_b: The second file system tree to compare: a dictionary of
path name -> file system entry mappings.
:type tree_b: ``Dict[str, FsEntry]``
:param options: Options to apply to the diff generation.
:type options: ``DiffOptions``
:param term_control: A ``TermControl`` instance to use for rendering color
output.
:type term_control: ``TermControl``
:returns: An ``FsDiffResults`` instance containing ``FsDiffRecord``
objects.
:rtype: ``FsDiffResults``
"""
if options is None:
options = DiffOptions()
diffs: List[FsDiffRecord] = []
all_paths = set(tree_a.keys()) | set(tree_b.keys())
_log_debug("Starting compute_diff with %d paths", len(all_paths))
start_time = datetime.now()
if not all_paths:
_log_info("No paths to diff; returning empty FsDiffResults")
return FsDiffResults(diffs, options, floor(start_time.timestamp()))
if options.ignore_timestamps:
_log_debug("Ignoring timestamp changes")
if options.ignore_permissions:
_log_debug("Ignoring permission changes")
if options.ignore_ownership:
_log_debug("Ignoring ownership changes")
if options.content_only:
_log_debug("Checking content changes only")
term_control = term_control or TermControl()
progress = ProgressFactory.get_progress(
"Computing diffs",
quiet=options.quiet,
term_control=term_control,
)
progress.start(len(all_paths))
try:
# pylint: disable=too-many-nested-blocks
for i, path in enumerate(sorted(all_paths)):
entry_a = tree_a.get(path)
entry_b = tree_b.get(path)
_log_debug_fsdiff(
"Comparing path '%s' (A:%s // B:%s)", path, entry_a, entry_b
)
progress.progress(i, f"Comparing trees for '{path}'")
if entry_a is None:
# File added in tree_b
diff_record = FsDiffRecord(path, DiffType.ADDED, new_entry=entry_b)
changes = self.change_detector.detect_added(entry_b, options)
# Optionally restrict to content-only changes.
effective_changes = self._effective_changes(changes, options)
for change in effective_changes:
diff_record.add_change(change)
# Generate content diff if requested, appropriate, and within size limits
if options.include_content_diffs and entry_b.is_file:
within_limit = (
options.max_content_diff_size <= 0
or entry_b.size <= options.max_content_diff_size
)
if within_limit:
content_diff = self.content_differ.generate_content_diff(
None,
entry_b.full_path,
None,
entry_b,
)
if content_diff:
diff_record.set_content_diff(content_diff)
diffs.append(diff_record)
elif entry_b is None:
# File removed from tree_a
diff_record = FsDiffRecord(
path, DiffType.REMOVED, old_entry=entry_a
)
changes = self.change_detector.detect_removed(entry_a, options)
# Optionally restrict to content-only changes.
effective_changes = self._effective_changes(changes, options)
for change in effective_changes:
diff_record.add_change(change)
# Generate content diff if requested, appropriate, and within size limits
if options.include_content_diffs and entry_a.is_file:
within_limit = (
options.max_content_diff_size <= 0
or entry_a.size <= options.max_content_diff_size
)
if within_limit:
content_diff = self.content_differ.generate_content_diff(
entry_a.full_path,
None,
entry_a,
None,
)
if content_diff:
diff_record.set_content_diff(content_diff)
diffs.append(diff_record)
else:
# File exists in both; first, detect any type changes.
if (
# pylint: disable=too-many-boolean-expressions
entry_a.is_file != entry_b.is_file
or entry_a.is_dir != entry_b.is_dir
or entry_a.is_symlink != entry_b.is_symlink
or entry_a.is_block != entry_b.is_block
or entry_a.is_char != entry_b.is_char
or entry_a.is_sock != entry_b.is_sock
or entry_a.is_fifo != entry_b.is_fifo
):
diffs.append(
FsDiffRecord(path, DiffType.TYPE_CHANGED, entry_a, entry_b)
)
continue
# Otherwise, check for metadata/content changes.
changes = self.change_detector.detect_changes(
entry_a, entry_b, options
)
# Optionally restrict to content-only changes.
effective_changes = self._effective_changes(changes, options)
# If there are no effective changes (e.g. only metadata changes in
# content-only mode), skip this path entirely.
if not effective_changes:
continue
diff_record = FsDiffRecord(
path, DiffType.MODIFIED, entry_a, entry_b
)
for change in effective_changes:
diff_record.add_change(change)
has_content_change = any(
change.change_type == ChangeType.CONTENT
for change in effective_changes
)
# Generate content diff if requested, appropriate, and within size limits
if (
has_content_change
and options.include_content_diffs
and entry_a.is_file
and entry_b.is_file
):
within_limit = (
options.max_content_diff_size <= 0
or max(entry_a.size, entry_b.size)
<= options.max_content_diff_size
)
if within_limit:
content_diff = self.content_differ.generate_content_diff(
entry_a.full_path,
entry_b.full_path,
entry_a,
entry_b,
)
if content_diff:
diff_record.set_content_diff(content_diff)
diffs.append(diff_record)
end_time = datetime.now()
except KeyboardInterrupt:
progress.cancel("Quit!")
raise
except SystemExit:
progress.cancel("Exiting.")
raise
progress.end(f"Found {len(diffs)} differences in {end_time - start_time}")
# Detect moves/renames
diffs = self._detect_moves(
diffs, tree_a, tree_b, options, term_control=term_control
)
return FsDiffResults(diffs, options, floor(start_time.timestamp()))
[docs] @staticmethod
def _is_move_diff(diff, src_path, dest_path):
"""
Return ``True`` if ``diff`` reflects a move diff type for the
current ``path``/``dest_path`` or ``False`` otherwise.
:param diff: The ``FsDiffRecord`` to inspect.
:param src_path: The original path to check.
:param dest_path: The destination path to check.
"""
if diff.path == src_path and diff.diff_type == DiffType.REMOVED:
return True
if diff.path == dest_path and diff.diff_type == DiffType.ADDED:
return True
return False
[docs] @staticmethod
def _best_sibling_proximity(
orig_path: str, candidates: List[Tuple[str, FsEntry]]
) -> Tuple[str, FsEntry]:
"""
Sibling proximity heuristic: choose the "best" move destination
when multiple candidates exist.
:param orig_path: The original path before the move.
:type orig_path: ``str``
:param candidates: Possible candidates for the destination, as
a list of (path, entry) pairs.
:type candidates: ``List[Tuple[str, FsEntry]]``
:returns: The winning (path, entry) pair.
:rtype: ``Tuple[str, FsEntry]``
"""
def filename_similarity(path_a: str, path_b: str) -> float:
"""
Quick and cheap similarity metric based on string matching and
token overlap.
:param path_a: The first path to compare.
:type path_a: ``str``
:param path_b: The second path to compare.
:type path_b: ``str``
:returns: A value 0..1 indicating the approximate similarity of the paths.
:rtype: ``float``
"""
if not path_a or not path_b:
return 0.0
# Same directory?
dir_a = os.path.dirname(path_a)
dir_b = os.path.dirname(path_b)
if dir_a == dir_b:
return 1.0
# Quick wins first
if path_a.startswith(path_b) or path_b.startswith(path_a):
return 1.0
if path_a.endswith(path_b) or path_b.endswith(path_a):
return 1.0
# Token overlap for compound names
tokens_a = set(re.split(r"[-_.,:@ ]", path_a))
tokens_b = set(re.split(r"[-_.,:@ ]", path_b))
overlap = len(tokens_a & tokens_b) / len(tokens_a | tokens_b)
return overlap
if len(candidates) > 1:
candidates.sort(
key=lambda x: filename_similarity(orig_path, x[0]),
reverse=True,
)
return candidates[0]
[docs] def _detect_moves(
self,
diffs: List[FsDiffRecord],
tree_a: Dict[str, FsEntry],
tree_b: Dict[str, FsEntry],
options: DiffOptions,
term_control: Optional[TermControl] = None,
) -> List[FsDiffRecord]:
"""
Detect file moves/renames by matching content hashes
:param diffs: A list of file system diff records to inspect.
:type diffs: ``List[FsDiffRecord]``
:param tree_a: The first file system tree to compare: a dictionary of
path name -> file system entry mappings.
:type tree_a: ``Dict[str, FsEntry]``
:param tree_b: The second file system tree to compare: a dictionary of
path name -> file system entry mappings.
:type tree_b: ``Dict[str, FsEntry]``
:param options: Options to apply to the diff generation.
:type options: ``DiffOptions``
:param term_control: A ``TermControl`` instance to use for rendering color
output.
:type term_control: ``TermControl``
:returns: Updated list of diff records with moves detected.
:rtype: ``List[FsDiffRecord]``
"""
# Pre-index existing added/removed records so we only treat genuine
# remove+add pairs as moves (and not simple copies).
added_paths = {
diff.path
for diff in diffs
if diff.diff_type == DiffType.ADDED
and diff.new_entry
and diff.new_entry.is_file
and diff.new_entry.content_hash
}
removed_paths = {
diff.path
for diff in diffs
if diff.diff_type == DiffType.REMOVED
and diff.old_entry
and diff.old_entry.is_file
and diff.old_entry.content_hash
}
changed_paths = {
diff.path
for diff in diffs
if diff.diff_type == DiffType.MODIFIED
and diff.old_entry
and diff.old_entry.is_file
and diff.old_entry.content_hash
}
_log_debug_fsdiff(
"Initialising move detection: "
"added_paths=%s, removed_paths=%s, changed_paths=%s",
", ".join(sorted(added_paths)),
", ".join(sorted(removed_paths)),
", ".join(sorted(changed_paths)),
)
_log_debug_fsdiff_extra(
"Diff records: %s", ",\n\n".join(str(diff) for diff in diffs)
)
# Map of paths to their corresponding FsDiffRecord list
diff_map = defaultdict(list)
for diff in diffs:
diff_map[diff.path].append(diff)
# Set of diff records to be pruned as a result of move detection.
to_prune = set()
# Index destination files by content hash, ignoring entries without a hash.
dest_hashes = defaultdict(list)
for path, entry in tree_b.items():
if entry.is_file and entry.content_hash:
dest_hashes[entry.content_hash].append((path, entry))
# Ensure each destination path is used as a move target at most once.
used_dests = set()
# Anything to do?
if not tree_a:
return diffs
term_control = term_control or TermControl()
progress = ProgressFactory.get_progress(
"Detecting moves",
quiet=options.quiet,
term_control=term_control,
)
moves = 0 # Count of move records
start_time = datetime.now()
progress.start(len(tree_a))
try:
for i, (path, entry_a) in enumerate(tree_a.items()):
_log_debug_fsdiff("Detecting moves for tree_a path '%s'", path)
progress.progress(i, f"Checking moves for '{path}'")
# Only consider files with a valid content hash for move detection.
if not (entry_a.is_file and entry_a.content_hash):
continue
if path in tree_b and entry_a.content_hash == tree_b[path].content_hash:
continue
candidates = dest_hashes.get(entry_a.content_hash)
if not candidates:
continue
_log_debug_fsdiff_extra(
"Checking candidate destinations: %s",
", ".join(cand_path for cand_path, entry in candidates),
)
# If multiple possible move destination candidates exists (i.e.
# content with the hash of entry_a now exists at multiple file
# system paths) we use a "sibling proximity heuristic" to select
# the best available candidate. This prefers moves that are in the
# same parent directory or that otherwise constitute "more similar"
# strings (using a cheap string similarity metric approximation).
dest_path, entry_b = self._best_sibling_proximity(path, candidates)
_log_debug_fsdiff("Selected candidate %s: %s", dest_path, entry_b)
# Only treat as a move if we have the corresponding REMOVED/ADDED
# records; otherwise this is more likely a copy/duplicate.
if path not in (removed_paths | changed_paths):
continue
if dest_path not in (added_paths | changed_paths):
continue
if dest_path == path:
continue
if dest_path in used_dests:
continue
# Found a move: count it
moves += 1
#: Add to cache of used destinations
used_dests.add(dest_path)
diff_record = FsDiffRecord(path, DiffType.MOVED, entry_a, entry_b)
changes = self.change_detector.detect_changes(entry_a, entry_b, options)
effective_changes = self._effective_changes(changes, options)
for change in effective_changes:
diff_record.add_change(change)
prune = [
diff
for diff in diff_map[path] + diff_map[dest_path]
if self._is_move_diff(diff, path, dest_path)
]
to_prune.update(prune)
diff_record.moved_from = path
diff_record.moved_to = dest_path
diffs.append(diff_record)
except KeyboardInterrupt:
progress.cancel("Quit!")
raise
except SystemExit:
progress.cancel("Exiting.")
raise
# Prune ADDED/REMOVED diff records for detected moves
_log_debug_fsdiff("Pruning %d diff records for detected moves", len(to_prune))
diffs = [diff for diff in diffs if diff not in to_prune]
end_time = datetime.now()
progress.end(f"Found {moves} moves in {end_time - start_time}")
return diffs