Skip to content

SOURCE CODE pipen.pluginmgr DOCS

"""Define hooks specifications and provide plugin manager"""
from __future__ import annotations

import shutil
from os import PathLike
from pathlib import Path
from typing import Any, Dict, TYPE_CHECKING

from simplug import Simplug, SimplugResult, makecall
from xqute import JobStatus, Scheduler
from xqute.utils import a_read_text, a_write_text, asyncify

from .defaults import ProcOutputType
from .exceptions import ProcInputValueError, ProcOutputValueError
from .utils import get_mtime as _get_mtime

if TYPE_CHECKING:  # pragma: no cover
    import signal
    from simplug import SimplugImplCall
    from xqute import Xqute
    from .job import Job
    from .proc import Proc
    from .pipen import Pipen

plugin = Simplug("pipen")

def on_setup(config: Dict[str, Any]) -> None:
    """Setup for plugins, primarily used for the plugins to
    setup some default configurations.

    This is only called once for all pipelines.

        config: The configuration dictionary
            plugin options should be defined under "plugin_opts"
            One should define a configuration item either with a prefix as
            the identity for the plugin or a namespace inside the plugin config

async def on_init(pipen: Pipen) -> None:
    """When the pipeline is initialized, and default configs are loaded

        pipen: The Pipen object

async def on_start(pipen: Pipen) -> None:
    """Right before the pipeline starts running.

    Process relationships are inferred.

        pipen: The Pipen object

async def on_complete(pipen: Pipen, succeeded: bool):
    """The the pipeline is completed.

        pipen: The Pipen object
        succeeded: Whether the pipeline has successfully completed.

def on_proc_create(proc: Proc):
    """Called Proc constructor when a process is created.

    Enables plugins to modify the default attributes of processes

        proc: The Proc object

async def on_proc_init(proc: Proc):
    """Called when a process is initialized.

    Allows plugins to modify the process attributes after initialization, but
    before the jobs are initialized.

        proc: The Proc object

def on_proc_input_computed(proc: Proc):
    """Called after process input data is computed.

        proc: The Proc object

def on_proc_script_computed(proc: Proc):
    """Called after process script is computed.

    The script is computed as a string that is about to compiled into a

        proc: The Proc object

async def on_proc_start(proc: Proc):
    """When a process is starting

        proc: The process

def on_proc_shutdown(proc: Proc, sig: signal.Signals) -> None:
    """When pipeline is shutting down, by Ctrl-c for example.

    Return False to stop shutting down, but you have to shut it down
    by yourself, for example, `proc.xqute.task.cancel()`

    Only the first return value will be used.

        pipen: The xqute object
        sig: The signal. `None` means a natural shutdown

async def on_proc_done(proc: Proc, succeeded: bool | str) -> None:
    """When a process is done

        proc: The process
        succeeded: Whether the process succeeded or not. 'cached' if all jobs
            are cached.

async def on_job_init(job: Job):
    """When a job is initialized

        job: The job

async def on_job_queued(job: Job):
    """When a job is queued in xqute. Note it might not be queued yet in
    the scheduler system.

        job: The job

async def on_job_submitting(job: Job) -> bool:
    """When a job is submitting.

    The first plugin (based on priority) have this hook return False will
    cancel the submission

        job: The job

        False to cancel submission

async def on_job_submitted(job: Job):
    """When a job is submitted in the scheduler system.

        job: The job

async def on_job_started(job: Job):
    """When a job starts to run in then scheduler system.

    Note that the job might not be running yet in the scheduler system.

        job: The job

async def on_job_polling(job: Job):
    """When status of a job is being polled.

        job: The job

async def on_job_killing(job: Job) -> bool:
    """When a job is being killed.

    The first plugin (based on priority) have this hook return False will
    cancel the killing

        job: The job

        False to cancel killing

async def on_job_killed(job: Job):
    """When a job is killed

        job: The job

async def on_job_succeeded(job: Job):
    """When a job completes successfully.

        job: The job

async def on_job_cached(job: Job):
    """When a job is cached.

        job: The job

async def on_job_failed(job: Job):
    """When a job is done but failed.

        job: The job

def _collect_norm_inpath(calls: list[SimplugImplCall]) -> str:
    for call in calls:
        out = makecall(call)
        if out is not None:
            return str(out)

    from .job import Job
    # The first argument could be self in implementation
    idx = 0 if isinstance(calls[0].args[0], Job) else 1
    job = calls[0].kwargs.pop("job", calls[0].args[idx])
    inpath = calls[0].kwargs.pop("inpath", calls[0].args[idx + 1])
    raise ProcInputValueError(
        f"[{}] Unsupported protocol for input path: "

def norm_inpath(job: Job, inpath: str | PathLike, is_dir: bool) -> str:
    """Normalize the input path

        job: The job
        inpath: The input path
        is_dir: Whether the path is a directory

        The normalized path

def _collect_norm_outpath(calls: list[SimplugImplCall]) -> str:
    for call in calls:
        out = makecall(call)
        if out is not None:
            return str(out)

    from .job import Job
    # The first argument could be self in implementation
    idx = 0 if isinstance(calls[0].args[0], Job) else 1
    job = calls[0].kwargs.pop("job", calls[0].args[idx])
    outpath = calls[0].kwargs.pop("outpath", calls[0].args[idx + 1])
    raise ProcOutputValueError(
        f"[{}] Unsupported protocol for output path: "

def norm_outpath(job: Job, outpath: str, is_dir: bool) -> str:
    """Normalize the output path

        job: The job
        outpath: The output path
        is_dir: Whether the path is a directory

        The normalized path

def _collect_get_mtime(calls: list[SimplugImplCall]) -> float:
    for call in calls:
        out = makecall(call)
        if out is not None:
            return float(out)

    from .job import Job
    # The first argument could be self in implementation
    idx = 0 if isinstance(calls[0].args[0], Job) else 1
    job = calls[0].kwargs.pop("job", calls[0].args[idx])
    path = calls[0].kwargs.pop("path", calls[0].args[idx + 1])
    raise NotImplementedError(
        f"[{}] Unsupported protocol in path to get mtime: "

def get_mtime(job: Job, path: str | PathLike, dirsig: int) -> float:
    """Get the mtime of a path, either a file or a directory

        job: The job
        path: The path to get mtime
        dirsig: The depth of the directory to check the last modification time

        The last modification time

async def _collect_clear_path(calls: list[SimplugImplCall]) -> bool:
    for call in calls:
        out = await makecall(call)
        if out is not None:
            return out

    from .job import Job
    # The first argument could be self in implementation
    idx = 0 if isinstance(calls[0].args[0], Job) else 1
    job = calls[0].kwargs.pop("job", calls[0].args[idx])
    path = calls[0].kwargs.pop("path", calls[0].args[idx + 1])
    raise NotImplementedError(
        f"[{}] Unsupported protocol in path to clear: "

async def clear_path(job: Job, path: str | PathLike, is_dir: bool) -> bool:
    """Clear the path, either a file or a directory

        job: The job
        path: The path to clear
        is_dir: Whether the path is a directory

        Whether the path is cleared successfully

async def _collect_output_exists(calls: list[SimplugImplCall]) -> bool:
    for call in calls:
        out = await makecall(call)
        if out is not None:
            return out

    from .job import Job
    # The first argument could be self in implementation
    idx = 0 if isinstance(calls[0].args[0], Job) else 1
    job = calls[0].kwargs.pop("job", calls[0].args[idx])
    path = calls[0].kwargs.pop("path", calls[0].args[idx + 1])
    raise NotImplementedError(
        f"[{}] Unsupported protocol in path to test existence: "

async def output_exists(job: Job, path: str, is_dir: bool) -> bool:
    """Check if the output exists

        job: The job
        path: The path to check
        is_dir: Whether the path is a directory

        Whether the output exists

def on_jobcmd_init(job: Job) -> str:
    """When the job command wrapper script is initialized before the prescript is run

    This should return a piece of bash code to be inserted in the wrapped job
    script (template), which is a python template string, with the following
    variables available: `status` and `job`. `status` is the class `JobStatus` from
    `` and `job` is the `Job` instance.

    For multiple plugins, the code will be inserted in the order of the plugin priority.

    The code will replace the `#![jobcmd_init]` placeholder in the wrapped job script.
    See also <>

        job: The job object

        The bash code to be inserted

def on_jobcmd_prep(job: Job) -> str:
    """When the job command right about to be run

    This should return a piece of bash code to be inserted in the wrapped job
    script (template), which is a python template string, with the following
    variables available: `status` and `job`. `status` is the class `JobStatus` from
    `` and `job` is the `Job` instance.

    The bash variable `$cmd` is accessible in the context. It is also possible to
    modify the `cmd` variable. Just remember to assign the modified value to `cmd`.

    For multiple plugins, the code will be inserted in the order of the plugin priority.
    Keep in mind that the `$cmd` may be modified by other plugins.

    The code will replace the `#![jobcmd_prep]` placeholder in the wrapped job script.
    See also <>

        job: The job object

        The bash code to be inserted

def on_jobcmd_end(job: Job) -> str:
    """When the job command finishes and after the postscript is run

    This should return a piece of bash code to be inserted in the wrapped job
    script (template), which is a python template string, with the following
    variables available: `status` and `job`. `status` is the class `JobStatus` from
    `` and `job` is the `Job` instance.

    The bash variable `$rc` is accessible in the context, which is the return code
    of the job command.

    For multiple plugins, the code will be inserted in the order of the plugin priority.

    The code will replace the `#![jobcmd_end]` placeholder in the wrapped job script.
    See also <>

        job: The job object

        The bash code to be inserted

class PipenMainPlugin:DOCS
    """The builtin core plugin, used to update the progress bar and
    cache the job"""

    name = "core"
    # The priority is set to -1000 to make sure it is the first plugin
    # to be called
    priority = -1000

    def on_proc_shutdown(self, proc: Proc, sig: signal.Signals):
        """When a process is shutting down"""
        if sig:  # pragma: no cover
                "Got signal %r, trying a graceful shutdown ...",

    async def on_job_submitted(self, job: Job):
        """Update the progress bar when a job is submitted"""

    async def on_job_started(self, job: Job):
        """Update the progress bar when a job starts to run"""

    async def on_job_cached(self, job: Job):
        """Update the progress bar when a job is cached"""
        job.status = JobStatus.FINISHED

    async def on_job_succeeded(self, job: Job):
        """Cache the job and update the progress bar when a job is succeeded"""
        # now the returncode is 0, however, we need to check if output files
        # have been created or not, this makes sure job.cache not fail
        for outkey, outtype in job._output_types.items():
            if outtype == ProcOutputType.VAR:

            output_exists = await plugin.hooks.output_exists(
                outtype == ProcOutputType.DIR,
            if not output_exists:
                job.status = JobStatus.FAILED
                stderr = await a_read_text(job.stderr_file)
                stderr = (
                    f"{stderr}\n\nOutput {outtype} {outkey!r} "
                    "is not generated."
                await a_write_text(job.stderr_file, stderr)
            await job.cache()

    async def on_job_failed(self, job: Job):
        """Update the progress bar when a job is failed"""
        if job.status == JobStatus.RETRYING:
            job.log("debug", "Retrying #%s", job.trial_count + 1)

    async def on_job_killed(self, job: Job):
        """Update the status of a killed job"""
        # instead of FINISHED to force the whole pipeline to quit
        job.status = JobStatus.FAILED  # pragma: no cover

    def norm_inpath(
        job: Job,
        inpath: str | PathLike,
        is_dir: bool,
    ) -> str:
        """Normalize the input path"""
        if "://" in str(inpath):
            # Let the plugins handle the protocol
            return None

        return str(Path(inpath).expanduser().resolve())

    def norm_outpath(
        job: Job,
        outpath: str,
        is_dir: bool,
    ) -> str:
        """Normalize the output path"""
        if "://" in outpath:
            # Let the plugins handle the protocol
            return None

        if Path(outpath).is_absolute():
            raise ProcOutputValueError(
                f"[{}] Process output should be a relative path: {outpath}"

        out = job.outdir.resolve() / outpath
        if is_dir:
            out.mkdir(parents=True, exist_ok=True)

        return str(out)

    def get_mtime(
        job: Job,
        path: str | PathLike,
        dirsig: int,
        """Get the mtime of a path"""
        if "://" in str(path):
            # Let the plugins handle the protocol
            return None

        return _get_mtime(path, dirsig)

    async def clear_path(self, job: Job, path: str | PathLike, is_dir: bool):
        """Clear the path"""
        if "://" in str(path):
            # Let the plugins handle the protocol
            return None

        path = Path(path)
            # dead link
            if not path.exists():
                if path.is_symlink():
                    await asyncify(Path.unlink)(path)

            elif not is_dir:
                await asyncify(Path.unlink)(path)

                await asyncify(shutil.rmtree)(path)
        except Exception:  # pragma: no cover
            return False
        return True

    async def output_exists(self, job: Job, path: str, is_dir: bool):
        """Check if the output exists"""
        if "://" in path:
            # Let the plugins handle the protocol
            return None

        path = Path(path)
        if not path.exists():
            return False
        if is_dir:
            return len(list(path.iterdir())) > 0  # pragma: no cover
        return True


xqute_plugin = Simplug("xqute")

class XqutePipenPlugin:DOCS
    """The plugin for xqute working as proxy for pipen plugin hooks"""

    name = "xqute.pipen"

    def on_shutdown(self, xqute: Xqute, sig: signal.Signals):
        """When a process is shutting down"""
        return plugin.hooks.on_proc_shutdown(xqute.proc, sig)

    async def on_job_init(self, scheduler: Scheduler, job: Job):
        """When a job is initialized"""
        await plugin.hooks.on_job_init(job)

    async def on_job_queued(self, scheduler: Scheduler, job: Job):
        """When a job is queued"""
        await plugin.hooks.on_job_queued(job)

    async def on_job_submitting(self, scheduler: Scheduler, job: Job):
        """When a job is being submitted"""
        return await plugin.hooks.on_job_submitting(job)

    async def on_job_submitted(self, scheduler: Scheduler, job: Job):
        """When a job is submitted"""
        await plugin.hooks.on_job_submitted(job)

    async def on_job_started(self, scheduler: Scheduler, job: Job):
        """When a job starts to run"""
        await plugin.hooks.on_job_started(job)

    async def on_job_polling(self, scheduler: Scheduler, job: Job):
        """When a job starts to run"""
        await plugin.hooks.on_job_polling(job)

    async def on_job_killing(self, scheduler: Scheduler, job: Job):
        """When a job is being killed"""
        return await plugin.hooks.on_job_killing(job)  # pragma: no cover

    async def on_job_killed(self, scheduler: Scheduler, job: Job):
        """When a job is killed"""
        await plugin.hooks.on_job_killed(job)  # pragma: no cover

    async def on_job_succeeded(self, scheduler: Scheduler, job: Job):
        """When a job is succeeded"""
        await plugin.hooks.on_job_succeeded(job)

    async def on_job_failed(self, scheduler: Scheduler, job: Job):
        """When a job is failed"""
        await plugin.hooks.on_job_failed(job)

    def on_jobcmd_init(self, scheduler: Scheduler, job: Job):
        """When the job command wrapper script is initialized"""
        codes = plugin.hooks.on_jobcmd_init(job)
        if not codes:
            return None
        return "\n\n".join(codes)

    def on_jobcmd_prep(self, scheduler: Scheduler, job: Job):
        """When the job command is about to be run"""
        codes = plugin.hooks.on_jobcmd_prep(job)
        if not codes:
            return None
        return "\n\n".join(codes)

    def on_jobcmd_end(self, scheduler: Scheduler, job: Job):
        """When the job command finishes"""
        codes = plugin.hooks.on_jobcmd_end(job)
        if not codes:
            return None
        return "\n\n".join(codes)
