package
pipen
A pipeline framework for python
module
pipen.scheduler
Provide builting schedulers
Classes
SchedulerPostInit— Provides post init function for all schedulers</>LocalScheduler— Local scheduler</>SgeScheduler— SGE scheduler</>SlurmScheduler— Slurm scheduler</>SshScheduler— SSH scheduler</>GbatchScheduler— Google Cloud Batch scheduler</>ContainerScheduler— Scheduler to run jobs via containers (Docker/Podman/Apptainer)</>
Functions
get_scheduler(scheduler)(Type) — Get the scheduler by name of the scheduler class itself</>
module
pipen.utils
Provide some utilities
Classes
RichHandler— Subclass of rich.logging.RichHandler, showing log levels as a singlecharacter </>RichConsole— A high level console interface.</>
Functions
brief_list(blist)(str) — Briefly show an integer list, combine the continuous numbers.</>copy_dict(dic,depth)(Mapping) — Deep copy a dict</>desc_from_docstring(obj,base)(str) — Get the description from docstring</>get_base(klass,abc_base,value,value_getter)(Type) — Get the base class where the value was first defined</>get_logger(name,level)(LoggerAdapter) — Get the logger by given plugin name</>get_logpanel_width()(int) — Get the width of the log content</>get_marked(cls,mark_name,default)(Any) — Get the marked value from a proc</>get_mtime(path,dir_depth)(float) — Get the modification time of a path.If path is a directory, try to get the last modification time of the contents in the directory at given dir_depth </>get_shebang(script)(str) — Get the shebang of the script</>ignore_firstline_dedent(text)(str) — Like textwrap.dedent(), but ignore first empty lines</>is_loading_pipeline(*flags,argv)(bool) — Check if we are loading the pipeline. Works only whenargv0is "@pipen" while loading the pipeline. </>is_subclass(obj,cls)(bool) — Tell if obj is a subclass of clsDifferences with issubclass is that we don't raise Type error if obj is not a class </>is_valid_name(name)(bool) — Check if a name is valid for a proc or pipen</>load_entrypoints(group)(Iterable) — Load objects from setuptools entrypoints by given group name</>load_pipeline(obj,argv0,argv1p,**kwargs)(Pipen) — Load a pipeline from a Pipen, Proc or ProcGroup object</>log_rich_renderable(renderable,color,logfunc,*args,**kwargs)— Log a rich renderable to logger</>make_df_colnames_unique_inplace(thedf)— Make the columns of a data frame unique</>mark(**kwargs)(Callable) — Mark a class (e.g. Proc) with given kwargs as metadata</>path_is_symlink(path)(bool) — Check if a path is a symlink.</>path_is_symlink_sync(path)(bool) — Check if a path is a symlink synchronously.</>path_symlink_to(src,dst,target_is_directory)— Create a symbolic link pointing to src named dst.</>pipen_banner()(RenderableType) — The banner for pipen</>strsplit(string,sep,maxsplit,trim)(List) — Split the string, with the ability to trim each part.</>truncate_text(text,width,end)(str) — Truncate a text not based on words/whitespacesOtherwise, we could use textwrap.shorten. </>update_dict(parent,new,depth,try_list,null_in_list)(Mapping) — Update the new dict to the parent, but make sure parent does not change</>
module
pipen.exceptions
Provide exception classes
Classes
PipenException— Base exception class for pipen</>PipenSetDataError— When trying to set input data to processes with input_data already setusing Pipen.set_data(). </>ProcInputTypeError— When an unsupported input type is provided</>ProcInputKeyError— When an unsupported input key is provided</>ProcInputValueError— When an unsupported input value is provided</>ProcScriptFileNotFound— When script file specified as 'file://' cannot be found</>ProcOutputNameError— When no name or malformatted output is provided</>ProcOutputTypeError— When an unsupported output type is provided</>ProcOutputValueError— When a malformatted output value is provided</>ProcDependencyError— When there is something wrong the process dependencies</>NoSuchSchedulerError— When specified scheduler cannot be found</>WrongSchedulerTypeError— When specified scheduler is not a subclass of Scheduler</>NoSuchTemplateEngineError— When specified template engine cannot be found</>WrongTemplateEngineTypeError— When specified tempalte engine is not a subclass of Scheduler</>TemplateRenderingError— Failed to render a template</>ConfigurationError— When something wrong set as configuration</>PipenOrProcNameError— "When more than one processes are sharing the same workdir</>
module
pipen.proc
module
pipen.template
Template adaptor for pipen
Classes
Template(source,**kwargs)— Base class wrapper to wrap template for pipen</>TemplateLiquid— Liquidpy template wrapper.</>TemplateJinja2— Jinja2 template wrapper</>
Functions
get_template_engine(template)(Type) — Get the template engine by name or the template engine itself</>
module
pipen.version
Provide version of pipen
module
pipen.pipen
Main entry module, provide the Pipen class
module
pipen.procgroup
Process group that contains a set of processes.
It can be easily used to create a pipeline that runs independently or integrated into a larger pipeline.
Runs directly:
>>> proc_group = ProcGroup(<options>)
>>> proc_group.as_pipen(<pipeline options>).set_data(<data>).run()
Integrated into a larger pipeline
>>> proc_group = ProcGroup(<options>)
>>> # proc could be a process within the larger pipeline
>>> proc.requires = prog_group.<proc>
To add a process to the proc group, use the add_proc method:
>>> class MyProcGroup(ProcGroup):
>>> ...
>>>
>>> proc_group = MyProcGroup(...)
>>> @proc_group.add_proc
>>> class MyProc(Proc):
>>> ...
Or add a process at runtime:
>>> class MyProcGroup(ProcGroup):
>>> ...
>>>
>>> @ProcGroup.add_proc
>>> def my_proc(self):
>>> class MyProc(Proc):
>>> # You may use self.options here
>>> ...
>>> return MyProc
>>> proc_group = MyProcGroup(...)
Classes
ProcGropuMeta— Meta class for ProcGroup</>ProcGroup— A group of processes that can be run independently orintegrated into a larger pipeline. </>
module
pipen.defaults
Provide some default values/objects
Classes
ProcInputType— Types for process inputs</>ProcOutputType— Types for process outputs</>
module
pipen.pluginmgr
Define hooks specifications and provide plugin manager
Classes
PipenMainPlugin— The builtin core plugin, used to update the progress bar andcache the job </>XqutePipenPlugin— The plugin for xqute working as proxy for pipen plugin hooks</>
Functions
on_complete(pipen,succeeded)— The the pipeline is completed.</>on_init(pipen)— When the pipeline is initialized, and default configs are loaded</>on_job_cached(job)— When a job is cached.</>on_job_failed(job)— When a job is done but failed.</>on_job_init(job)— When a job is initialized</>on_job_killed(job)— When a job is killed</>on_job_killing(job)(bool) — When a job is being killed.</>on_job_polling(job,counter)— When status of a job is being polled.</>on_job_queued(job)— When a job is queued in xqute. Note it might not be queued yet inthe scheduler system. </>on_job_started(job)— When a job starts to run in then scheduler system.</>on_job_submitted(job)— When a job is submitted in the scheduler system.</>on_job_submitting(job)(bool) — When a job is submitting.</>on_job_succeeded(job)— When a job completes successfully.</>on_jobcmd_end(job)(str) — When the job command finishes and after the postscript is run</>on_jobcmd_init(job)(str) — When the job command wrapper script is initialized before the prescript is run</>on_jobcmd_prep(job)(str) — When the job command right about to be run</>on_proc_create(proc)— Called Proc constructor when a process is created.</>on_proc_done(proc,succeeded)— When a process is done</>on_proc_input_computed(proc)— Called after process input data is computed.</>on_proc_script_computed(proc)— Called after process script is computed.</>on_proc_shutdown(proc,sig)— When pipeline is shutting down, by Ctrl-c for example.</>on_proc_start(proc)— When a process is starting</>on_setup(pipen)— Setup for plugins, primarily used for the plugins tosetup some default configurations. </>on_start(pipen)— Right before the pipeline starts running.</>
module
pipen.channel
Provide some function for creating and modifying channels (dataframes)
Functions
collapse_files(data,col)(DataFrame) — Collapse a Channel according to the files in,other cols will use the values in row 0. </> expand_dir(data,col,pattern,ftype,sortby,reverse)(DataFrame) — Expand a Channel according to the files in,other cols will keep the same. </>
module
pipen.progressbar
Provide the PipelinePBar and ProcPBar classes
Classes
ProcPBar— The progress bar for processes</>PipelinePBar— Progress bar for the pipeline</>
package
pipen.cli
Provide CLI for pipen