# Copyright Red Hat
#
# snapm/progress.py - Snapshot Manager terminal progress indicator
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Terminal control and progress indicator
"""
from typing import ClassVar, Dict, List, Optional, TextIO, Union
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import curses
import sys
import os
import re
from snapm import register_progress, unregister_progress
#: Default number of columns if not detected from terminal.
DEFAULT_COLUMNS = 80
#: Minimum width of a progress bar.
PROGRESS_MIN_WIDTH = 10
#: Minimum budget to reserve for status messages
MIN_BUDGET = 10
#: Default width of a progress bar as a fraction of the terminal size.
DEFAULT_WIDTH_FRAC = 0.5
#: Default frames-per-second for Throbber classes
DEFAULT_FPS = 10
#: Microseconds per second
_USECS_PER_SEC = 1000000
[docs]class TermControl:
"""
A class for portable terminal control and output.
Uses the curses package to set up appropriate terminal control
sequences for the current terminal.
Inspired by and adapted from:
https://code.activestate.com/recipes/475116-using-terminfo-for-portable-color-output-cursor-co/
Copyright Edward Loper and released under the PSF license.
`TermControl` defines a set of instance variables whose
values are initialized to the control sequence necessary to
perform a given action. These can be simply included in normal
output to the terminal:
>>> term = TermControl()
>>> print 'This is '+term.GREEN+'green'+term.NORMAL
Alternatively, the `render()` method can used, which replaces
'${action}' with the string required to perform 'action':
>>> term = TermControl()
>>> print term.render('This is ${GREEN}green${NORMAL}')
If the terminal doesn't support a given action, then the value of
the corresponding instance variable will be set to ''. As a
result, the above code will still work on terminals that do not
support color, except that their output will not be colored.
Also, this means that you can test whether the terminal supports a
given action by simply testing the truth value of the
corresponding instance variable:
>>> term = TermControl()
>>> if term.CLEAR_SCREEN:
... print 'This terminal supports clearning the screen.'
Finally, if the width and height of the terminal are known, then
they will be stored in the `columns` and `lines` attributes.
"""
# Cursor movement:
BOL: str = "" #: Move the cursor to the beginning of the line
UP: str = "" #: Move the cursor up one line
DOWN: str = "" #: Move the cursor down one line
LEFT: str = "" #: Move the cursor left one char
RIGHT: str = "" #: Move the cursor right one char
# Deletion:
CLEAR_SCREEN: str = "" #: Clear the screen and move to home position
CLEAR_EOL: str = "" #: Clear to the end of the line.
CLEAR_BOL: str = "" #: Clear to the beginning of the line.
CLEAR_EOS: str = "" #: Clear to the end of the screen
# Output modes:
BOLD: str = "" #: Turn on bold mode
BLINK: str = "" #: Turn on blink mode
DIM: str = "" #: Turn on half-bright mode
REVERSE: str = "" #: Turn on reverse-video mode
NORMAL: str = "" #: Turn off all modes
# Terminal bell:
BELL: str = "" #: Ring the terminal bell
# Cursor display:
HIDE_CURSOR: str = "" #: Make the cursor invisible
SHOW_CURSOR: str = "" #: Make the cursor visible
# Foreground colors:
BLACK: str = "" #: Black foreground color
BLUE: str = "" #: Blue foreground color
GREEN: str = "" #: Green foreground color
CYAN: str = "" #: Cyan foreground color
RED: str = "" #: Red foreground color
MAGENTA: str = "" #: Magenta foreground color
YELLOW: str = "" #: Yellow foreground color
WHITE: str = "" #: White foreground color
# Background colors:
BG_BLACK: str = "" #: Black background color
BG_BLUE: str = "" #: Blue background color
BG_GREEN: str = "" #: Green background color
BG_CYAN: str = "" #: Cyan background color
BG_RED: str = "" #: Red background color
BG_MAGENTA: str = "" #: Magenta background color
BG_YELLOW: str = "" #: Yellow background color
BG_WHITE: str = "" #: White background color
# Terminal size:
columns: Optional[int] = None #: Terminal width
lines: Optional[int] = None #: Terminal height
_STRING_CAPABILITIES: List[str] = (
"""
BOL:cr UP:cuu1 DOWN:cud1 LEFT:cub1 RIGHT:cuf1
CLEAR_SCREEN:clear CLEAR_EOL:el CLEAR_BOL:el1 CLEAR_EOS:ed BOLD:bold
BLINK:blink DIM:dim REVERSE:rev UNDERLINE:smul NORMAL:sgr0 BELL:bel
HIDE_CURSOR:civis SHOW_CURSOR:cnorm""".split()
)
_LEGACY_COLORS: List[str] = (
"""BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE""".split()
)
_ANSI_COLORS: List[str] = "BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split()
def _force_ansi(self):
ansi_codes = {
"BLACK": "\033[0;30m",
"RED": "\033[0;31m",
"GREEN": "\033[0;32m",
"YELLOW": "\033[0;33m",
"BLUE": "\033[0;34m",
"MAGENTA": "\033[0;35m",
"CYAN": "\033[0;36m",
"WHITE": "\033[0;37m",
}
for color, code in ansi_codes.items():
setattr(self, color, code)
# Work around `less -R` not liking "\033[0m" (ANSI reset)
setattr(self, "NORMAL", ansi_codes["WHITE"])
def _init_colors(self):
"""
Initialize terminal color codes.
"""
set_fg = self._tigetstr("setf")
if set_fg:
set_fg = set_fg.encode("utf8")
for i, color in enumerate(self._LEGACY_COLORS):
setattr(self, color, curses.tparm(set_fg, i).decode("utf8") or "")
set_fg_ansi = self._tigetstr("setaf")
if set_fg_ansi:
set_fg_ansi = set_fg_ansi.encode("utf8")
for i, color in enumerate(self._ANSI_COLORS):
setattr(self, color, curses.tparm(set_fg_ansi, i).decode("utf8") or "")
set_bg = self._tigetstr("setb")
if set_bg:
set_bg = set_bg.encode("utf8")
for i, color in enumerate(self._LEGACY_COLORS):
setattr(
self, "BG_" + color, curses.tparm(set_bg, i).decode("utf8") or ""
) # pragma: no cover
set_bg_ansi = self._tigetstr("setab")
if set_bg_ansi:
set_bg_ansi = set_bg_ansi.encode("utf8")
for i, color in enumerate(self._ANSI_COLORS):
setattr(
self,
"BG_" + color,
curses.tparm(set_bg_ansi, i).decode("utf8") or "",
)
[docs] def __init__(self, term_stream: Optional[TextIO] = None, color: str = "auto"):
"""
Initialize terminal capabilities and size information.
If the output stream is not a tty or terminal setup fails,
the instance will have no terminal capabilities (all control
attributes remain empty strings or None).
:param term_stream: Output stream to probe for capabilities.
:type term_stream: ``Optional[TextIO]``
:param color: A string to control color rendering: "auto", "always", or
"never".
:type color: ``str``
"""
# Default to stdout
if term_stream is None:
term_stream = sys.stdout
self.term_stream = term_stream
# If the stream isn't a tty, then assume it has no capabilities.
if color != "always":
if not hasattr(term_stream, "isatty") or not term_stream.isatty():
return
# Check the terminal type. If we fail, then assume that the
# terminal has no capabilities.
try:
curses.setupterm()
# Attempting to catch curses.error raises 'TypeError: catching classes
# that do not inherit from BaseException is not allowed' even though
# it claims to inherit from builtins.Exception:
#
# Help on class error in module _curses:
# class error(builtins.Exception)
except BaseException as err: # pylint: disable=broad-exception-caught
# Preserve normal interruption/termination semantics.
if isinstance(err, (KeyboardInterrupt, SystemExit)): # pragma: no cover
raise
if color == "always":
self._force_ansi()
return # pragma: no cover
# Look up numeric capabilities.
self.columns = curses.tigetnum("cols")
self.lines = curses.tigetnum("lines")
# Look up string capabilities.
for capability in self._STRING_CAPABILITIES:
(attr, cap_name) = capability.split(":")
setattr(self, attr, self._tigetstr(cap_name) or "")
if color != "never":
# Colors
self._init_colors()
def _tigetstr(self, cap_name):
# String capabilities can include "delays" of the form "$<2>".
# For any modern terminal, we should be able to just ignore
# these, so strip them out.
cap = curses.tigetstr(cap_name)
cap = cap.decode(encoding="utf8") if cap else ""
return cap.split("$", maxsplit=1)[0]
[docs] def render(self, template):
"""
Replace each $-substitution with the corresponding control.
Replace each $-substitution in the template string with the
corresponding terminal control string (if defined) or an
empty string (if not defined).
:param template: Template string containing ${NAME} patterns.
:type template: ``str``
:returns: Rendered string with substitutions applied.
:rtype: ``str``
"""
return re.sub(r"\$\$|\${\w+}", self._render_sub, template)
def _render_sub(self, match):
s = match.group()
if s == "$$":
return "$"
return getattr(self, s[2:-1], "")
def _flush_with_broken_pipe_guard(stream: TextIO) -> None:
"""
Handle ``BrokenPipeError`` when attempting to flush output streams.
:param stream: The stream to flush.
:type stream: TextIO
"""
if stream is None or not hasattr(stream, "flush"):
return
try:
stream.flush()
except BrokenPipeError as err:
devnull = os.open(os.devnull, os.O_WRONLY)
try:
if hasattr(stream, "fileno"):
os.dup2(devnull, stream.fileno())
finally:
os.close(devnull)
raise SystemExit() from err
[docs]class ProgressBase(ABC):
"""
An abstract progress reporting class.
"""
FIXED = -1
[docs] def __init__(self, register: bool = True):
"""
Initialize base progress state.
Sets default state for lifecycle and rendering configuration.
:param register: Register this ``ProgressBase`` for log callbacks.
:type register: ``bool``
"""
self.total: int = 0
self.header: Optional[str] = None
self.term: Optional[TermControl] = None
self.stream: Optional[TextIO] = None
self.width: int = -1
self.first_update: bool = True
self.skip_line: bool = False
self.registered: bool = False
self.register: bool = register
[docs] def reset_position(self):
"""Mark progress bar as displaced by external output."""
self._check_in_progress(0, "reset_position")
self.skip_line = True
self._do_reset_position()
@abstractmethod
def _do_reset_position(self):
"""
Do any work necessary when resetting position due to interleaved log
messages.
"""
def _calculate_width(
self, width: Optional[int] = None, width_frac: Optional[float] = None
) -> int:
"""
Calculate the width for the progress bar using values defined by
child classes. The ``header`` and (optionally, for classes that use it)
``term`` members must be initialised before calling this method.
:type term: ``Optional[TermControl]``
:param width: An optional width value in characters. If specified
the progress bar will occupy this width. Cannot be used
with ``width_frac``.
:type width: ``int``
:param width_frac: An optional width fraction between [0..1]. If
specified the terminal width is multiplied by this
value and rounded to the nearest integer to
determine the width of the progress bar. Cannot be
used with ``width``.
:type width_frac: ``Optional[float]``
:returns: The calculated progress bar width in characters.
:rtype: ``int``
:raises ``ValueError``: If FIXED is negative, header is unset,
or both width and width_frac are specified.
"""
if self.FIXED < 0:
raise ValueError(
f"{self.__class__.__name__}: self.FIXED must be initialised "
"before calling self._calculate_width()"
)
if not hasattr(self, "header") or self.header is None:
raise ValueError(
f"{self.__class__.__name__}: self.header must be initialised "
"before calling self._calculate_width()"
)
if width is not None and width_frac is not None:
raise ValueError("width and width_frac are mutually exclusive")
if hasattr(self, "term") and hasattr(self.term, "columns"):
columns = self.term.columns
else:
columns = DEFAULT_COLUMNS
if width is None:
width_frac = width_frac or DEFAULT_WIDTH_FRAC
fixed = self.FIXED + len(self.header)
width = round((columns - fixed) * width_frac)
# Ensure a reasonable minimum width for the bar body.
width = max(PROGRESS_MIN_WIDTH, width)
return width
[docs] def start(self, total: int):
"""
Begin a progress run with the specified ``total``.
:param total: The total number of expected progress items.
:type total: ``int``
"""
if total <= 0:
raise ValueError("total must be positive.")
self.total = total
if self.register:
register_progress(self)
self._do_start()
@abstractmethod
def _do_start(self):
"""
Hook invoked when progress begins.
Subclasses implement startup behavior such as rendering an
initial progress display.
"""
def _check_in_progress(self, done: int, step: str):
"""
Validate that progress is active and ``done`` is in range.
:param done: The number of completed progress items.
:type done: ``int``
:param step: The progress step (method name) that is active.
:type step: ``str``
:raises ``ValueError``: If progress has not started, if done is
negative, or if done exceeds total.
"""
theclass = self.__class__.__name__
if self.total == 0:
raise ValueError(f"{theclass}.{step}() called before start()")
if done < 0:
raise ValueError(f"{theclass}.{step}() done cannot be negative.")
if done > self.total:
raise ValueError(f"{theclass}.{step}() done cannot be > total.")
[docs] def progress(self, done: int, message: Optional[str] = None):
"""
Advance the progress indicator to the specified ``done`` count.
:param done: The number of completed progress items.
:type done: ``int``
:param message: An optional progress message.
:type message: ``Optional[str]``
"""
self._check_in_progress(done, "progress")
self._do_progress(done, message)
@abstractmethod
def _do_progress(self, done: int, message: Optional[str] = None):
"""
Hook for subclasses to update the progress display.
:param done: The number of completed progress items.
:type done: ``int``
:param message: An optional progress message.
:type message: ``Optional[str]``
"""
[docs] def end(self, message: Optional[str] = None):
"""
End the progress run and finalize the display.
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
self._check_in_progress(self.total, "end")
self.progress(self.total, "")
self._do_end(message)
self.total = 0
if self.registered:
unregister_progress(self)
@abstractmethod
def _do_end(self, message: Optional[str] = None):
"""
Perform final end-of-progress handling.
This hook is called for both normal completion (via ``end()``)
and error termination (via ``cancel()``/``_do_cancel()``).
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
[docs] def cancel(self, message: Optional[str] = None):
"""
End the progress run with error and finalize the display.
:param message: An optional error message.
:type message: ``Optional[str]``
"""
self._check_in_progress(self.total, "cancel")
self._do_cancel(message=message)
self.total = 0
if self.registered:
unregister_progress(self)
def _do_cancel(self, message: Optional[str] = None):
"""
Perform error case final end-of-progress handling.
:param message: An optional error message.
:type message: ``Optional[str]``
"""
self._do_end(message=message)
[docs]class Progress(ProgressBase):
"""
A 2-line Unicode or ASCII progress bar, which looks like:
Header: 20% [███████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]
progress message
or:
Header: 20% [===========----------------------------------]
progress message
The progress bar is colored, if the terminal supports color
output and adjusts to the width of the terminal.
"""
BAR = (
"${BOLD}${CYAN}%s${NORMAL}: %3d%% "
"${GREEN}[${BOLD}%s%s${NORMAL}${GREEN}]${NORMAL}\n"
) #: Progress bar format string
FIXED = 9 #: Length of fixed characters in BAR.
[docs] def __init__(
self,
header,
register: bool = True,
term_stream: Optional[TextIO] = None,
width: Optional[int] = None,
width_frac: Optional[float] = None,
no_clear: bool = False,
tc: Optional[TermControl] = None,
):
"""
Initialise a two-line terminal progress renderer.
:param header: The progress header to display.
:type header: ``str``
:param register: Register this ``Progress`` for log callbacks.
:type register: ``bool``
:param term_stream: The terminal stream to write to.
:type term_stream: ``TextIO``
:param width: An optional width value in characters. If specified
the progress bar will occupy this width. Cannot be used
with ``width_frac``.
:type width: ``int``
:param width_frac: An optional width fraction between [0..1]. If
specified the terminal width is multiplied by this
value and rounded to the nearest integer to
determine the width of the progress bar. Cannot be
used with ``width``.
:type width_frac: ``Optional[float]``
:param no_clear: For progress indicators that clear and re-draw the
content on progress, do not erase the progress bar
when ``ProgressBase.end()`` is called, leaving it
on the terminal for the user to refer to. Ignored
by ``ProgressBase`` child classes that do not use
``TerminalControl``.
:type no_clear: ``bool``
:param tc: An optional ``TermControl`` object already initialised with
a ``term_stream`` value. If this argument is set
it will override any ``term_stream`` argument.
:type tc: ``Optional[TermControl]``
:raises ValueError: If terminal lacks required capabilities.
"""
super().__init__(register=register)
if tc is not None:
term_stream = tc.term_stream
self.header: Optional[str] = header
self.term: Optional[TermControl] = tc or TermControl(term_stream=term_stream)
self.stream: Optional[TextIO] = term_stream or sys.stdout
if not (self.term.CLEAR_EOL and self.term.UP and self.term.BOL):
raise ValueError("Terminal does not support required control characters.")
self.width: int = self._calculate_width(width=width, width_frac=width_frac)
self.no_clear = no_clear
self.pbar: Optional[str] = None
self.first_update: bool = False
self.skip_line: bool = False
self.did_erase: bool = False
columns = self.term.columns or DEFAULT_COLUMNS
self.budget: int = max(MIN_BUDGET, columns - 10)
self.fps: int = DEFAULT_FPS
self._interval_us: int = round((1.0 / self.fps) * _USECS_PER_SEC)
self._last: Optional[datetime] = None
self._last_message: Optional[str] = None
self.done = 0
encoding = getattr(self.stream, "encoding", None)
if not encoding:
# Stream has no usable encoding; fall back to ASCII bar.
self.did = "="
self.todo = "-"
else:
try:
"█░".encode(encoding)
self.did = "█"
self.todo = "░"
except UnicodeEncodeError:
self.did = "="
self.todo = "-"
def _do_reset_position(self):
"""
Erase our last update before the logging subsystem outputs its message.
"""
# Only do this *once* - otherwise we end up erasing log messages if
# multiple log messages notify us of a reset between progress updates.
if not self.did_erase:
erase = 2 * (self.term.BOL + self.term.UP + self.term.CLEAR_EOL)
print(erase, end="", file=self.stream)
_flush_with_broken_pipe_guard(self.stream)
self.did_erase = True
def _do_start(self):
"""
Prepare rendering state for the progress bar.
Sets up the rendered bar template and marks the progress as ready
for its first update.
"""
self.pbar = self.term.render(self.BAR)
self.first_update = True
self._last = datetime.now() - timedelta(microseconds=self._interval_us)
def _do_progress(self, done: int, message: Optional[str] = None):
"""
Update the two-line progress bar.
:param done: The number of completed progress items.
:type done: ``int``
:param message: An optional progress message.
:type message: ``Optional[str]``
"""
message = message or ""
percent = float(done) / float(self.total)
n = int((self.width - 10) * percent)
must_update = False
# Used by cancel()
self.done = done
# Stash last message in case we need it later.
self._last_message = message
if self.skip_line:
must_update = True
prefix = ""
self.skip_line = False
elif self.first_update:
prefix = self.term.HIDE_CURSOR + self.term.BOL
self.first_update = False
else:
prefix = 2 * (self.term.BOL + self.term.UP + self.term.CLEAR_EOL)
if len(message) > self.budget:
message = message[0 : self.budget - 3] + "..."
now = datetime.now()
past = (now - self._last).total_seconds() * _USECS_PER_SEC >= self._interval_us
if must_update or past:
self._last = now
print(
prefix
+ (
self.pbar
% (
self.header,
percent * 100,
self.did * n,
self.todo * (self.width - 10 - n),
)
)
+ self.term.CLEAR_EOL
+ message
+ "\n",
file=self.stream,
end="",
)
self.did_erase = False
_flush_with_broken_pipe_guard(self.stream)
def _do_end(self, message: Optional[str] = None):
"""
End progress reporting on this ``Progress`` instance.
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
if not self.no_clear:
print(
2 * (self.term.BOL + self.term.UP + self.term.CLEAR_EOL),
file=self.stream,
)
print(
(
self.term.BOL
+ self.term.CLEAR_EOL
+ self.term.UP
+ self.term.CLEAR_EOL
+ self.term.SHOW_CURSOR
),
file=self.stream,
end="",
)
else:
print(
(
self.term.UP
+ self.term.BOL
+ self.term.CLEAR_EOL
+ self.term.SHOW_CURSOR
),
file=self.stream,
end="",
)
print(self.term.NORMAL, end="", file=self.stream)
if message:
print(message, file=self.stream)
_flush_with_broken_pipe_guard(self.stream)
def _do_cancel(self, message: Optional[str] = None):
"""
Perform error case final end-of-progress handling.
:param message: An optional error message.
:type message: ``Optional[str]``
"""
# Force an update regardless of FPS limits
self._last = datetime.now() - timedelta(microseconds=self._interval_us)
self.progress(self.done, message=self._last_message)
# Print a newline to prevent erasure of last message.
print(file=self.stream)
self._do_end(message=message)
[docs]class SimpleProgress(ProgressBase):
"""
A simple progress bar that does not rely on terminal capabilities.
"""
BAR = "%s: %3d%% [%s%s] (%s)" #: Progress bar format string
DID = "=" #: Bar character for completed work.
TODO = "-" #: Bar character for uncompleted work.
FIXED = 12 #: Length of fixed characters in BAR.
[docs] def __init__(
self,
header,
register: bool = True,
term_stream: Optional[TextIO] = None,
width: Optional[int] = None,
width_frac: Optional[float] = None,
):
"""
Initialise a new ``SimpleProgress`` object.
:param header: The progress header to display.
:type header: ``str``
:param register: Register this ``SimpleProgress`` for log callbacks.
:type register: ``bool``
:param term_stream: The terminal stream to write to.
:type term_stream: ``Optional[TextIO]``
:param width: An optional width value in characters. If specified
the progress bar will occupy this width. Cannot be used
with ``width_frac``.
:type width: ``int``
:param width_frac: An optional width fraction between [0..1]. If
specified the terminal width is multiplied by this
value and rounded to the nearest integer to
determine the width of the progress bar. Cannot be
used with ``width``.
:type width_frac: ``Optional[float]``
"""
super().__init__(register=register)
self.header: Optional[str] = header
self.stream: Optional[TextIO] = term_stream or sys.stdout
self.width: int = self._calculate_width(width=width, width_frac=width_frac)
def _do_reset_position(self):
"""
Respond to progress displacement on this ``SimpleProgress`` object.
No-op for ``SimpleProgress`` instances.
"""
return
def _do_start(self):
"""
Start reporting progress on this ``SimpleProgress`` object.
No-op for ``SimpleProgress`` instances.
"""
return
def _do_progress(self, done: int, message: Optional[str] = None):
"""
Report progress on this ``SimpleProgress`` instance.
:param done: The number of completed progress items.
:type done: ``int``
:param message: An optional progress message.
:type message: ``Optional[str]``
"""
message = message or ""
percent = float(done) / float(self.total)
n = int(self.width * percent)
print(
self.BAR
% (
self.header,
percent * 100,
self.DID * n,
self.TODO * (self.width - n),
message,
),
file=self.stream,
)
_flush_with_broken_pipe_guard(self.stream)
def _do_end(self, message: Optional[str] = None):
"""
End progress reporting on this ``SimpleProgress`` instance.
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
if message:
print(message, file=self.stream)
_flush_with_broken_pipe_guard(self.stream)
[docs]class NullProgress(ProgressBase):
"""
A progress class that produces no output.
"""
def _do_reset_position(self):
"""
Respond to progress displacement on this ``NullProgress`` object.
No-op for ``NullProgress`` instances.
"""
return
def _do_start(self):
"""
Start reporting progress on this ``NullProgress`` object.
No-op for ``NullProgress`` instances.
"""
return # pragma: no cover
# pylint: disable=unused-argument
def _do_progress(self, done: int, message: Optional[str] = None):
"""
Report progress on this ``NullProgress`` instance.
No-op for ``NullProgress`` instances.
:param done: The number of completed progress items.
:type done: ``int``
:param message: An optional progress message (unused).
:type message: ``Optional[str]``
"""
return # pragma: no cover
# pylint: disable=unused-argument
def _do_end(self, message: Optional[str] = None):
"""
End progress reporting on this ``NullProgress`` instance.
No-op for ``NullProgress`` instances.
:param message: An optional completion message (unused).
:type message: ``Optional[str]``
"""
return # pragma: no cover
[docs]class ThrobberBase(ABC):
"""
An abstract busy indicator class. Unlike ``ProgressBase`` classes
the throbber reports progress of a time consuming task where the
total number of items is unknown (for instance, generating tree
walk lists).
"""
[docs] def __init__(self, header, register: bool = True):
"""
Initialize base throbber state.
Sets default state for lifecycle and rendering configuration.
:param register: Register this ``ThrobberBase`` for log callbacks.
:type register: ``bool``
"""
self.header: str = header
self.frames: str = r"."
self.term: Optional[TermControl] = None
self.stream: Optional[TextIO] = None
self.started: bool = False
self.first_update: bool = True
self.nr_frames: int = len(self.frames)
self.fps: int = DEFAULT_FPS
self._frame_index: int = 0
self._interval_us: int = round((1.0 / self.fps) * _USECS_PER_SEC)
self._last: Optional[datetime] = None
self.registered: bool = False
self.register: bool = register
[docs] def reset_position(self):
"""Mark throbber as displaced by external output."""
self.first_update = True
[docs] def start(self):
"""
Begin a throbber run.
"""
self.started = True
self._last = datetime.now() - timedelta(microseconds=self._interval_us)
if self.register:
register_progress(self)
self._do_start()
self.throb()
def _do_start(self):
"""
Hook invoked when throbber begins.
Subclasses implement startup behavior such as rendering an
initial throbber display.
"""
print(f"{self.header}: ..", end="", file=self.stream)
def _check_started(self, step: str):
"""
Validate that throbber is active.
:param step: The throbber step (method name) that is active.
:type step: ``str``
:raises ``ValueError``: If throbber has not started, or if the throbber
has started but ``self._last`` is ``None``.
"""
theclass = self.__class__.__name__
if not self.started:
raise ValueError(f"{theclass}.{step}() called before start()")
if self._last is None:
raise ValueError(
f"{theclass}.{step}() invalid throbber state:"
"self.started=True but self._last=None"
)
[docs] def throb(self):
"""
Maintain liveness for this throbber and output frame if required.
"""
self._check_started("throb")
now = datetime.now()
if (now - self._last).total_seconds() * _USECS_PER_SEC >= self._interval_us:
self._do_throb()
_flush_with_broken_pipe_guard(self.stream)
self._last = now
self._frame_index = (self._frame_index + 1) % self.nr_frames
self.first_update = False
@abstractmethod
def _do_throb(self):
"""
Hook for subclasses to update the throbber display.
"""
[docs] def end(self, message: Optional[str] = None):
"""
End the throbber run and finalize the display.
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
self._check_started("end")
self._do_end(message=message)
_flush_with_broken_pipe_guard(self.stream)
self.started = False
self._last = None
if self.registered:
unregister_progress(self)
def _do_end(self, message: Optional[str] = None):
"""
Perform final end-of-throbber handling.
Subclasses that implement fancy throbbers with ``TermControl``
should override this method to perform clearing/redrawing the
throbber frame.
:param message: An optional completion message.
:type message: ``Optional[str]``
"""
print(f"\n{message}" if message else "", file=self.stream)
[docs]class Throbber(ThrobberBase):
"""
A ``ThrobberBase`` subclass to display a one line Unicode or ASCII
throbber for capable terminals.
"""
STYLES: ClassVar[Dict[str, Union[str, List[str]]]] = {
"ascii": r"-\|/",
"wave": "⠁⠂⠄⡀⢀⠠⠐⠈",
"braillewave": "⣾⣽⣻⢿⡿⣟⣯⣷",
"horizontalbars": "█▉▊▋▌▍▎▏▎▍▌▋▊▉█",
"verticalbars": "▁▂▃▄▅▆▇█▇▆▅▄▃▂▁",
"arrowspinner": "←↖↑↗→↘↓↙",
"braillecircle": ["⢎⡰", "⢎⡡", "⢎⡑", "⢎⠱", "⠎⡱", "⢊⡱", "⢌⡱", "⢆⡱"],
"bouncingball": [
"[● ]",
"[ ● ]",
"[ ● ]",
"[ ● ]",
"[ ● ]",
"[ ●]",
"[ ● ]",
"[ ● ]",
"[ ● ]",
"[ ● ]",
"[● ]",
],
"bouncingbar": [
"[| ]",
"[ | ]",
"[ | ]",
"[ | ]",
"[ | ]",
"[ |]",
"[ | ]",
"[ | ]",
"[ | ]",
"[ | ]",
"[| ]",
],
}
[docs] def __init__(
self,
header: str,
register: bool = True,
style: Optional[str] = None,
term_stream: Optional[TextIO] = None,
no_clear: bool = False,
tc: Optional[TermControl] = None,
):
"""
Initialise a new one line throbber instance.
:param header: The header string to print.
:type header: ``str``
:param register: Register this ``Throbber`` for log callbacks.
:type register: ``bool``
:param style: A Unicode throbber style string. Ignored if the terminal
does not support Unicode encoding. Defaults to "wave" if
unset.
:type style: ``Optional[str]``
:param term_stream: The terminal stream to write to.
:type term_stream: ``TextIO``
:param no_clear: For throbber indicators that clear and re-draw the
content on progress, do not erase the throbber frame
when ``ThrobberBase.end()`` is called, leaving it
on the terminal for the user to refer to. Ignored
by ``ThrobberBase`` child classes that do not use
``TerminalControl``.
:type no_clear: ``bool``
:param tc: An optional ``TermControl`` object already initialised with
a ``term_stream`` value. If this argument is set
it will override any ``term_stream`` argument.
:type tc: ``Optional[TermControl]``
:raises ValueError: If terminal lacks required capabilities.
"""
super().__init__(header, register=register)
if style is not None and style not in Throbber.STYLES:
raise ValueError(f"Unknown Throbber style: {style}")
style = style or "wave"
if tc is not None:
term_stream = tc.term_stream
self.term: Optional[TermControl] = tc or TermControl(term_stream=term_stream)
self.stream: Optional[TextIO] = term_stream or sys.stdout
self.no_clear = no_clear
# Override default frames
ascii_frames = self.STYLES["ascii"]
encoding = getattr(self.stream, "encoding", None)
if not encoding:
# Stream has no usable encoding; fall back to ASCII frames.
self.frames = ascii_frames
else:
try:
unicode_frames = self.STYLES[style]
if isinstance(unicode_frames, str):
unicode_frames.encode(encoding)
else:
for frame in unicode_frames:
frame.encode(encoding)
self.frames = unicode_frames
except UnicodeEncodeError:
self.frames = ascii_frames
self.nr_frames = len(self.frames)
def _do_start(self):
"""
Just turn off the cursor: ``_do_throb()`` will print the header.
"""
print(f"{self.term.HIDE_CURSOR}", end="", file=self.stream)
def _do_throb(self):
"""
Update the throbber frame.
"""
if not self.first_update:
# Erase previous throb frame.
print(
self.term.BOL + self.term.UP + self.term.CLEAR_EOL,
end="",
file=self.stream,
)
# Draw current throb frame.
print(f"{self.header}: ", end="", file=self.stream)
print(
(
f"{self.term.GREEN}"
f"{self.frames[self._frame_index]}"
f"{self.term.NORMAL}\n"
),
end="",
file=self.stream,
)
def _do_end(self, message: Optional[str] = None):
"""
Finalise the throbber.
"""
if not self.first_update and not self.no_clear:
# Header length plus ": ".
header_width = len(self.header) + 2
# Erase previous throb frame.
print(
self.term.BOL
+ self.term.UP
+ header_width * self.term.RIGHT
+ self.term.CLEAR_EOL,
end="",
file=self.stream,
)
print(self.term.SHOW_CURSOR, end="", file=self.stream)
print(f"{message}\n" if message else "", end="", file=self.stream)
[docs]class SimpleThrobber(ThrobberBase):
"""
A simple throbber that does not rely on terminal capabilities.
"""
[docs] def __init__(
self, header: str, register: bool = True, term_stream: Optional[TextIO] = None
):
"""
Initialise a simple ascii throbber.
:param header: The throbber header.
:type header: ``str``
:param register: Register this ``SimpleThrobber`` for log callbacks.
:type register: ``bool``
:param term_stream: The terminal stream to write to.
:type term_stream: ``Optional[TextIO]``
"""
super().__init__(header, register=register)
self.stream = term_stream or sys.stdout
def _do_throb(self):
"""
Output simple throb frame.
"""
print(self.frames[self._frame_index], end="", file=self.stream)
[docs]class NullThrobber(ThrobberBase):
"""
A throbber class that produces no output.
"""
def _do_start(self):
"""No-op start for NullThrobber."""
def _do_throb(self):
"""No-op throb hook for NullThrobber."""
def _do_end(self, message: Optional[str] = None):
"""No-op end for NullThrobber."""
[docs] def throb(self):
"""Silent throb for NullThrobber."""
self._check_started("throb")
[docs] def end(self, message: Optional[str] = None):
"""Silent end for NullThrobber."""
self._check_started("end")
self.started = False
if self.registered:
unregister_progress(self)
[docs]class ProgressFactory:
"""
A factory for constructing progress objects.
"""
[docs] @staticmethod
def get_progress(
header: str,
quiet: bool = False,
term_stream: Optional[TextIO] = None,
term_control: Optional[TermControl] = None,
width: Optional[int] = None,
width_frac: Optional[float] = None,
no_clear: bool = False,
register: bool = True,
) -> ProgressBase:
"""
Return an appropriate ProgressBase implementation.
:param header: The progress report header.
:type header: ``str``
:param quiet: Suppress all output.
:type quiet: ``bool``
:param term_stream: An optional ``TextIO`` output object.
Defaults to ``sys.stdout`` if unspecified.
:type term_stream: ``Optional[TextIO]``
:param term_control: An optional ``TermControl`` object to use
for the progress report.
:type term_control: ``Optional[TermControl]``
:param width: An optional width value in characters. If specified
the progress bar will occupy this width. Cannot be used
with ``width_frac``.
:type width: ``int``
:param width_frac: An optional width fraction between [0..1]. If
specified the terminal width is multiplied by this
value and rounded to the nearest integer to
determine the width of the progress bar. Cannot be
used with ``width``.
:type width_frac: ``Optional[float]``
:param no_clear: For progress indicators that clear and re-draw the
content on progress, do not erase the progress bar
when ``ProgressBase.end()`` is called, leaving it
on the terminal for the user to refer to. Ignored
by ``ProgressBase`` child classes that do not use
``TerminalControl``.
:type no_clear: ``bool``
:param register: Register the new object with the log system for
notification callbacks.
:type register: ``bool``
:returns: An appropriate progress implementation.
:rtype: ``ProgressBase``
"""
if term_control:
term_stream = term_control.term_stream
term_stream = term_stream or sys.stdout
if quiet:
progress = NullProgress(register=register)
elif not hasattr(term_stream, "isatty") or not term_stream.isatty():
progress = SimpleProgress(
header,
register=register,
term_stream=term_stream,
width=width,
width_frac=width_frac,
)
else:
progress = Progress(
header,
register=register,
term_stream=term_stream,
width=width,
width_frac=width_frac,
no_clear=no_clear,
tc=term_control,
)
return progress
[docs] @staticmethod
def get_throbber_styles() -> List[str]:
"""
Return a list of known ``Throbber`` style strings.
:returns: A list of throbber styles.
:rtype: ``List[str]``
"""
return list(Throbber.STYLES.keys())
[docs] @staticmethod
def get_throbber(
header: str,
quiet: bool = False,
style: Optional[str] = None,
term_stream: Optional[TextIO] = None,
term_control: Optional[TermControl] = None,
no_clear: bool = False,
register: bool = True,
) -> ThrobberBase:
"""
Return an appropriate ThrobberBase implementation.
:param header: The throbber header.
:type header: ``str``
:param quiet: Suppress all output.
:type quiet: ``bool``
:param style: An optional Unicode throbber style string. Ignored if the
terminal does not support Unicode encoding.
:type style: ``Optional[str]``
:param term_stream: An optional ``TextIO`` output object.
Defaults to ``sys.stdout`` if unspecified.
:type term_stream: ``Optional[TextIO]``
:param term_control: An optional ``TermControl`` object to use
for the throbber.
:type term_control: ``Optional[TermControl]``
:param no_clear: For throbber indicators that clear and re-draw the
content on throbber, do not erase the throbber bar
when ``ThrobberBase.end()`` is called, leaving it
on the terminal for the user to refer to. Ignored
by ``ThrobberBase`` child classes that do not use
``TerminalControl``.
:type no_clear: ``bool``
:param register: Register the new object with the log system for
notification callbacks.
:type register: ``bool``:
:returns: An appropriate throbber implementation.
:rtype: ``ThrobberBase``
"""
if term_control:
term_stream = term_control.term_stream
term_stream = term_stream or sys.stdout
if quiet:
throbber = NullThrobber(header, register=register)
elif not hasattr(term_stream, "isatty") or not term_stream.isatty():
throbber = SimpleThrobber(
header,
register=register,
term_stream=term_stream,
)
else:
throbber = Throbber(
header,
register=register,
style=style,
term_stream=term_stream,
no_clear=no_clear,
tc=term_control,
)
return throbber
__all__ = [
"DEFAULT_FPS",
"NullProgress",
"NullThrobber",
"Progress",
"ProgressBase",
"ProgressFactory",
"SimpleProgress",
"SimpleThrobber",
"TermControl",
"ThrobberBase",
"Throbber",
]