Skip to content

SOURCE CODE pipen.progressbar DOCS

"""Provide the PipelinePBar and ProcPBar classes"""
from __future__ import annotations

from typing import TYPE_CHECKING

from .utils import truncate_text

if TYPE_CHECKING:  # pragma: no cover
    import enlighten

# [12/02/20 12:44:06] I core
#                 pipeline: 100%|
# |        desc_len      |
PBAR_DESC_LEN = 24


class ProcPBar:DOCS
    """The progress bar for processes"""

    def __init__(
        self, manager: enlighten.Manager, proc_size: int, proc_name: str
    ) -> None:
        self.submitted_counter = manager.counter(
            total=proc_size,
            color="cyan",
            desc=proc_name,
            unit="jobs",
            leave=False,
        )
        self.running_counter = self.submitted_counter.add_subcounter("yellow")
        self.success_counter = self.submitted_counter.add_subcounter("green")
        self.failure_counter = self.submitted_counter.add_subcounter("red")

    def update_job_submitted(self):DOCS
        """Update the progress bar when a job is submitted"""
        self.submitted_counter.update()

    def update_job_retrying(self):DOCS
        """Update the progress bar when a job is retrying"""
        # self.running_counter.count -= 1
        self.failure_counter.update(-1)

    def update_job_running(self):DOCS
        """Update the progress bar when a job is running"""
        try:
            self.running_counter.update_from(self.submitted_counter)
        except ValueError:  # pragma: no cover
            pass

    def update_job_succeeded(self):DOCS
        """Update the progress bar when a job is succeeded"""
        try:
            self.success_counter.update_from(self.running_counter)
        except ValueError:  # pragma: no cover
            try:
                self.success_counter.update_from(self.submitted_counter)
            except ValueError:  # pragma: no cover
                pass
        except:  # noqa: E722  # pragma: no cover
            pass

    def update_job_failed(self):DOCS
        """Update the progress bar when a job is failed"""
        try:
            self.failure_counter.update_from(self.running_counter)
        except ValueError:  # pragma: no cover
            try:
                self.failure_counter.update_from(self.submitted_counter)
            except ValueError:  # pragma: no cover
                pass
        except:  # noqa: E722  # pragma: no cover
            pass

    def done(self):DOCS
        """The process is done"""
        self.submitted_counter.close()


class PipelinePBar:DOCS
    """Progress bar for the pipeline"""

    def __init__(self, n_procs: int, ppln_name: str) -> None:
        """Initialize progress bar for pipeline"""
        import enlighten

        desc_len = PBAR_DESC_LEN
        ppln_name = truncate_text(ppln_name, desc_len)
        self.manager = enlighten.get_manager()
        self.running_counter = self.manager.counter(
            total=n_procs,
            color="yellow",
            desc=f"{ppln_name:>{desc_len}}:",
            unit="procs",
        )
        self.success_counter = self.running_counter.add_subcounter("green")
        self.failure_counter = self.running_counter.add_subcounter("red")
        self.desc_len = desc_len

    def proc_bar(self, proc_size: int, proc_name: str) -> ProcPBar:DOCS
        """Get the progress bar for a process

        Args:
            proc_size: The size of the process
            proc_name: The name of the process

        Returns:
            The progress bar for the given process
        """
        proc_name = truncate_text(proc_name, self.desc_len)
        proc_name = f"{proc_name:>{self.desc_len}}:"
        return ProcPBar(self.manager, proc_size, proc_name)

    def update_proc_running(self):DOCS
        """Update the progress bar when a process is running"""
        self.running_counter.update()

    def update_proc_done(self):DOCS
        """Update the progress bar when a process is done"""
        self.success_counter.update_from(self.running_counter)

    def update_proc_error(self):DOCS
        """Update the progress bar when a process is errored"""
        self.failure_counter.update_from(self.running_counter)

    def done(self) -> None:DOCS
        """When the pipeline is done"""
        self.running_counter.close()
        self.manager.stop()