Plugins
pipen
uses simplug
for plugin support. There are very enriched hooks available for you to write your own plugins to extend pipen
.
Runtime plugins
Plugin hooks
To implement a function in your plugin, just simply:
from pipen import plugin
@plugin.impl
[async ]def hook(...):
...
Note that you have to use keyword-arguments and they have to match the hook signature.
See simplug
for more details.
Pipeline-level hooks
-
on_setup(config)
(sync):Setup for the plugin, mainly used for initalization and set the default values for the plugin configuration items.
This is only called once even when you have multiple pipelines (
Pipen
objects) in a python session. -
on_init(pipen)
(async)Called when pipeline is initialized. Note that here only default configurations are loaded (from defaults.CONFIG and config files). The configurations from
Pipen
constructor and the processes are not loaded yet. It's useful for plugins to change the default configurations. -
on_start(pipen)
(async)Right before the pipeline starts to run. The process relationships are inferred here. You can access the start processes by
pipen.starts
and all processes bypipen.procs
in the sequence of the execution order. -
on_complete(pipen, succeeded)
(async)After all processes finish.
succeeded
indicates whether all processes/jobs finish successfully.
Process-level hooks
-
on_proc_create(proc)
(sync)Called before proc get instantiated. Enables plugins to modify the default attributes of processes
-
on_proc_input_computed(proc)
(sync)Called after process input data is computed.
-
on_proc_script_computed(proc)
(sync)Called after process script is computed.
The script is computed as a string that is about to compiled into a template. You can modify the script here.
-
on_proc_init(proc)
(async)When process object is initialized. Allows plugins to modify the process attributes after initialization, but before the jobs are initialized.
-
on_proc_start(proc)
(async)When process object initialization completes, including the
xqute
and job initialization. Theoutput_data
is also accessible here. The process is ready to run. -
on_proc_shutdown(proc, sig)
(sync)When the process is shut down (i.e. by
<ctrl-c>
). You can access the signal that shuts the process down bysig
. Only first plugin (based on the priority) that implements this hook will get called. -
on_proc_done(proc, succeeded)
(async)When a process is done.
Job-level hooks
-
on_job_init(job)
(async)When a job is initialized
-
on_job_queued(job)
(async)When a job is queued in xqute. Note it might not be queued yet in the scheduler system.
-
on_job_submitting(job)
(async)When a job is submitting.
The first plugin (based on priority) have this hook return
False
will cancel the submission -
on_job_submitted(job)
(async)When a job is submitted in the scheduler system.
-
on_job_started(job)
(async)When a job starts to run in then scheduler system.
-
on_job_polling(job)
(async)When status of a job is being polled.
-
on_job_killing(job)
(async)When a job is being killed.
The first plugin (based on priority) have this hook return
False
will cancel the killing -
on_job_killed(job)
(async)When a job is killed
-
on_job_succeeded(job)
(async)When a job completes successfully
-
on_job_cached(job)
(async)When a job is cached
-
on_job_failed(job)
(async)When a job is done but failed (i.e. return_code == 1).
-
on_jobcmd_init(job) -> str
(sync)When the job command wrapper script is initialized before the prescript is run
This should return a piece of bash code to be inserted in the wrapped job script (template), which is a python template string, with the following variables available:
status
andjob
.status
is the classJobStatus
fromxqute.defaults.py
andjob
is theJob
instance.For multiple plugins, the code will be inserted in the order of the plugin priority.
The code will replace the
#![jobcmd_init]
placeholder in the wrapped job script. See also https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95 -
on_jobcmd_prep(job) -> str
(sync)When the job command right about to be run
This should return a piece of bash code to be inserted in the wrapped job script (template), which is a python template string, with the following variables available:
status
andjob
.status
is the classJobStatus
fromxqute.defaults.py
andjob
is theJob
instance.The bash variable
$cmd
is accessible in the context. It is also possible to modify thecmd
variable. Just remember to assign the modified value tocmd
.For multiple plugins, the code will be inserted in the order of the plugin priority. Keep in mind that the
$cmd
may be modified by other plugins.The code will replace the
#![jobcmd_prep]
placeholder in the wrapped job script. See also https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95 -
on_jobcmd_end(job) -> str
(sync):When the job command finishes and after the postscript is run
This should return a piece of bash code to be inserted in the wrapped job script (template), which is a python template string, with the following variables available:
status
andjob
.status
is the classJobStatus
fromxqute.defaults.py
andjob
is theJob
instance.The bash variable
$rc
is accessible in the context, which is the return code of the job command.For multiple plugins, the code will be inserted in the order of the plugin priority.
The code will replace the
#![jobcmd_end]
placeholder in the wrapped job script. See also https://github.com/pwwang/xqute/blob/master/xqute/defaults.py#L95
IO hooks
The io hooks are used to handle the input/output files/directories. The idea is to provide more flexibility to fetch the last modified time of the files/directories, and remove the files/directories when the job restarts. The APIs of these types of plugins are primarily used to generate the cache signature for the jobs. There are 5 APIs that need to be implemented:
norm_inpath(job: Job, inpath: str | PathLike, is_dir: bool) -> str
: Normalize/Transform the input path. This is helpful when you want to download the file from a remote server or cloud storage, or you want to use a different path to represent the same file, and provide the local path to the job, so that the job can access the file locally and we don't need to handle them in the job script. Theis_dir
indicates whether the path is a directory.norm_outpath(job: Job, outpath: str, is_dir: bool) -> str
: Normalize/Transform the output path. Note that this is different fromnorm_inpath
because when this is called, the output files/directories are not created yet. So we can't use it to upload the files to a remote server or cloud storage yet. To do that, you can use theon_job_succeeded
hook to upload the files.def get_mtime(job: Job, path: str | PathLike, dirsig: int) -> float
: Get the last modified time of the file/directory. Thedirsig
is the depth to check the files under the directory. If it's0
, only the directory itself is checked. Note that modify a file inside a directory may not change the last modified time of the directory itself.async def clear_path(job: Job, path: str | PathLike, is_dir: bool) -> bool
: Clear the file/directory. This is used to remove the files/directories when the job restarts. Theis_dir
indicates whether the path is a directory.async def output_exists(job: Job, path: str, is_dir: bool) -> bool
: Check if the output file/directory exists. This is used to check if the output file/directory exists.
It's required to define a protocol for your plugin. For example: gs://
for Google Cloud Storage, s3://
for AWS S3, etc. When implementing the above APIs, check if the path starts with the protocol, and handle the path accordingly, otherwise, return None
for the next available plugin to handle. If no ://
in the path, then a local path is assumed. The built-in plugin will handle the local paths.
Loading plugins
You can specify the plugins to be loaded by specifying the names or the plugin itself in plugins
configuration. With names, the plugins will be loaded from entry points.
You can also disable some plugins if they are set in the lower-priority configurations. For example, you want to disable pipen_verbose
(enabled in a configuration file) for a pipeline:
Pipen(..., plugins=["-pipen_verbose"])
Note
You can use +
as prefix to enable a disabled plugin, or -
as prefix to disable an enabled plugin. If no prefix is used, only the specified plugins will be enabled and all other plugins will be disabled. You should either use +
or -
for all plugins or none of them. If a plugin is not given as a string, it will be treated as +plugin
.
Writing a plugin
You can write your own plugin by implementing some of the above hooks. You can import the plugin directly and add it to `Pipen(..., plugins=[...]). For example:
from pipen import plugin, Pipen
class PipenPlugin:
@plugin.impl
[async ]def hook(...):
...
Pipen(..., plugins=[PipenPlugin])
You can also use the entry point to register your plugin using the group name `pipen`
For `setup.py`, you will need:
```python
setup(
# ...
entry_points={"pipen": ["pipen_verbose = pipen_verbose"]},
# ...
)
For pyproject.toml
:
[tool.poetry.plugins.pipen]
pipen_verbose = "pipen_verbose"
Then the plugin pipen_verbose
can be loaded by plugins=["+pipen_verbose"]
or disabled by plugins=["-pipen_verbose"]
Logging to the console from a plugin
Of course you can do arbitrary logging from a plugin. However, to keep the consistency with main logger of pipen
, The best practice is:
from pipen.utils import get_logger
logger = get_logger("verbose", "info")
# do some logging inside the hooks
The above code will produce some logging on the console like this:
11-04 12:00:19 I main ╭═══════════════════════════ Process ═══════════════════════════╮
11-04 12:00:19 I main ║ Undescribed. ║
11-04 12:00:19 I main ╰═══════════════════════════════════════════════════════════════╯
11-04 12:00:19 I main Process: Workdir: '.pipen/process'
11-04 12:00:19 I verbose Process: size: 10
11-04 12:00:19 I verbose Process: [0/9] in.a: 0
11-04 12:00:19 I verbose Process: [0/9] out.b: pipeline-0-output/Process/0/a.txt
CLI plugins
See CLI for more details.
Plugin gallery
pipen-verbose
: Add verbosal information in logs for pipen.pipen-report
: Generate report for pipenpipen-filters
: Add a set of useful filters for pipen templates.pipen-diagram
: Draw pipeline diagrams for pipenpipen-args
: Command line argument parser for pipenpipen-dry
: Dry runner for pipen pipelinespipen-annotate
: Use docstring to annotate pipen processespipen-board
: Visualize configuration and running of pipen pipelines on the webpipen-lock
: Process lock for pipen to prevent multiple runs at the same time.pipen-log2file
: Save running logs to file for pipenpipen-poplog
: Populate logs from jobs to running log of the pipelinepipen-runinfo
: Save running information to file for pipenpipen-gcs
: A plugin for pipen to handle files in Google Cloud Storage.