"""Provide the PipelinePBar and ProcPBar classes"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .utils import truncate_text
if TYPE_CHECKING: # pragma: no cover
import enlighten
# [12/02/20 12:44:06] I core
# pipeline: 100%|
# | desc_len |
PBAR_DESC_LEN = 23
class ProcPBar:DOCS
"""The progress bar for processes"""
__slots__ = (
"counter",
"proc_size",
"bar_format",
"queued_counter",
"submitted_counter",
"running_counter",
"success_counter",
"failure_counter",
)
def __init__(
self, manager: enlighten.Manager, proc_size: int, proc_name: str
) -> None:
"""Initialize the progress bar for a process"""
if proc_size > 1:
self.bar_format = None
bar_format = (
"{desc}{desc_pad}{percentage:3.0f}%|{bar}| "
"I:{count_0:<{len_total}d} "
"Q:{count_1:<{len_total}d} "
"S:{count_2:<{len_total}d} "
"R:{count_3:<{len_total}d} "
"D:{count_4:>{len_total}d}|{count_5:<{len_total}d} "
"[{rate:5.2f}{unit_pad}{unit}/s]"
)
else:
self.bar_format = (
"{{desc}}{{desc_pad}}{{percentage:3.0f}}%|{{bar}}| "
"{:^9} [{{rate:5.2f}}{{unit_pad}}{{unit}}/s]"
)
bar_format = self.bar_format.format("-----------")
self.counter: enlighten.Counter = manager.counter(
total=proc_size,
color="grey",
desc=proc_name,
unit="jobs ",
leave=False,
)
self.proc_size = proc_size
self.queued_counter: enlighten.SubCounter = self.counter.add_subcounter(
"lightblue"
)
self.submitted_counter: enlighten.SubCounter = self.counter.add_subcounter(
"cyan"
)
self.running_counter: enlighten.SubCounter = self.counter.add_subcounter(
"yellow"
)
self.success_counter: enlighten.SubCounter = self.counter.add_subcounter(
"green"
)
self.failure_counter: enlighten.SubCounter = self.counter.add_subcounter("red")
# defer setting the bar_format, in case self.counter is rendered too early
# ValueError: Reserve field 'count_0' specified in format,
# but no subcounters are configured
self.counter.bar_format = bar_format
def update_job_inited(self):DOCS
"""Update the progress bar when a job is init'ed"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Init'ed")
self.counter.update()
def update_job_queued(self):DOCS
"""Update the progress bar when a job is queued"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Queued")
try:
self.queued_counter.update_from(self.counter)
except ValueError: # pragma: no cover
pass
def update_job_submitted(self):DOCS
"""Update the progress bar when a job is init'ed"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Submitted")
try:
self.submitted_counter.update_from(self.queued_counter)
except ValueError: # pragma: no cover
pass
def update_job_retrying(self): # pragma: no coverDOCS
"""Update the progress bar when a job is retrying"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Retrying")
self.failure_counter.update(-1)
self.running_counter.update(0, force=True)
self.submitted_counter.update(0, force=True)
self.queued_counter.update(0, force=True)
self.counter.update(1, force=True)
def update_job_running(self):DOCS
"""Update the progress bar when a job is running"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Running")
try:
self.running_counter.update_from(self.submitted_counter)
except ValueError: # pragma: no cover
pass
def update_job_succeeded(self, cached: bool = False):DOCS
"""Update the progress bar when a job is succeeded"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format(
"Cached" if cached else "Succeeded"
)
try:
self.success_counter.update_from(self.running_counter)
except ValueError: # pragma: no cover
pass
def update_job_failed(self):DOCS
"""Update the progress bar when a job is failed"""
if self.bar_format:
self.counter.bar_format = self.bar_format.format("Failed")
try:
self.failure_counter.update_from(self.running_counter)
except ValueError: # pragma: no cover
pass
def done(self):DOCS
"""The process is done"""
try:
self.counter.close()
except: # noqa: E722 # pragma: no cover
pass
class PipelinePBar:DOCS
"""Progress bar for the pipeline"""
__slots__ = (
"manager",
"running_counter",
"success_counter",
"failure_counter",
"desc_len",
)
def __init__(self, n_procs: int, ppln_name: str) -> None:
"""Initialize progress bar for pipeline"""
import enlighten
desc_len = PBAR_DESC_LEN
ppln_name = truncate_text(ppln_name, desc_len)
self.manager = enlighten.get_manager()
self.running_counter = self.manager.counter(
total=n_procs,
color="yellow",
desc=f"{ppln_name:>{desc_len}}:",
unit="procs",
bar_format=(
"{desc}{desc_pad}{percentage:3.0f}%|{bar}| "
f"{{count:{{len_total}}d}}/{n_procs} "
"[{rate:5.2f}{unit_pad}{unit}/s]"
),
)
self.success_counter = self.running_counter.add_subcounter("green")
self.failure_counter = self.running_counter.add_subcounter("red")
self.desc_len = desc_len
def proc_bar(self, proc_size: int, proc_name: str) -> ProcPBar:DOCS
"""Get the progress bar for a process
Args:
proc_size: The size of the process
proc_name: The name of the process
Returns:
The progress bar for the given process
"""
proc_name = truncate_text(proc_name, self.desc_len)
proc_name = f"{proc_name:>{self.desc_len}}:"
return ProcPBar(self.manager, proc_size, proc_name)
def update_proc_running(self):DOCS
"""Update the progress bar when a process is running"""
self.running_counter.update()
def update_proc_done(self):DOCS
"""Update the progress bar when a process is done"""
self.success_counter.update_from(self.running_counter)
def update_proc_error(self):DOCS
"""Update the progress bar when a process is errored"""
self.failure_counter.update_from(self.running_counter)
def done(self) -> None:DOCS
"""When the pipeline is done"""
try:
self.running_counter.close()
self.manager.stop()
except: # noqa: E722 # pragma: no cover
pass