module

biopipen.ns.misc

Misc processes

Classes
class

biopipen.ns.misc.File2Proc(*args, **kwds)Proc

Bases
biopipen.core.proc.Proc pipen.proc.Proc

Accept a file and pass it down with a symbolic link

Attributes
  • cache Should we detect whether the jobs are cached?
  • desc The description of the process. Will use the summary fromthe docstring by default.
  • dirsig When checking the signature for caching, whether should we walkthrough the content of the directory? This is sometimes time-consuming if the directory is big.
  • envs The arguments that are job-independent, useful for common optionsacross jobs.
  • envs_depth How deep to update the envs when subclassed.
  • error_strategy How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • export When True, the results will be exported to <pipeline.outdir>Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • forks How many jobs to run simultaneously?
  • input The keys for the input channel
  • input_data The input data (will be computed for dependent processes)
  • lang The language for the script to run. Should be the path to theinterpreter if lang is not in $PATH.
  • name The name of the process. Will use the class name by default.
  • nexts Computed from requires to build the process relationships
  • num_retries How many times to retry to jobs once error occurs
  • order The execution order for this process. The bigger the numberis, the later the process will be executed. Default: 0. Note that the dependent processes will always be executed first. This doesn't work for start processes either, whose orders are determined by Pipen.set_starts()
  • output The output keys for the output channel(the data will be computed)
  • output_data The output data (to pass to the next processes)
  • plugin_opts Options for process-level plugins
  • requires The dependency processes
  • scheduler The scheduler to run the jobs
  • scheduler_opts The options for the scheduler
  • script The script template for the process
  • submission_batch How many jobs to be submited simultaneously
  • template Define the template engine to use.This could be either a template engine or a dict with key engine indicating the template engine and the rest the arguments passed to the constructor of the pipen.template.Template object. The template engine could be either the name of the engine, currently jinja2 and liquidpy are supported, or a subclass of pipen.template.Template. You can subclass pipen.template.Template to use your own template engine.
Input
  • infile The input file
Output
  • outfile The output symbolic link to the input file
Classes
Methods
  • __init_subclass__() Do the requirements inferring since we need them to build up theprocess relationship </>
  • from_proc(proc, name, desc, envs, envs_depth, cache, export, error_strategy, num_retries, forks, input_data, order, plugin_opts, requires, scheduler, scheduler_opts, submission_batch) (Type) Create a subclass of Proc using another Proc subclass or Proc itself</>
  • gc() GC process for the process to save memory after it's done</>
  • init() Init all other properties and jobs</>
  • log(level, msg, *args, logger) Log message for the process</>
  • run() Run the process</>
class

pipen.proc.ProcMeta(name, bases, namespace, **kwargs)

Bases
abc.ABCMeta

Meta class for Proc

Methods
  • __call__(cls, *args, **kwds) (Proc) Make sure Proc subclasses are singletons</>
  • __instancecheck__(cls, instance) Override for isinstance(instance, cls).</>
  • __repr__(cls) (str) Representation for the Proc subclasses</>
  • __subclasscheck__(cls, subclass) Override for issubclass(subclass, cls).</>
  • register(cls, subclass) Register a virtual subclass of an ABC.</>
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

staticmethod
__repr__(cls) → str

Representation for the Proc subclasses

staticmethod
__call__(cls, *args, **kwds)

Make sure Proc subclasses are singletons

Parameters
  • *args (Any) and
  • **kwds (Any) Arguments for the constructor
Returns (Proc)

The Proc instance

classmethod

from_proc(proc, name=None, desc=None, envs=None, envs_depth=None, cache=None, export=None, error_strategy=None, num_retries=None, forks=None, input_data=None, order=None, plugin_opts=None, requires=None, scheduler=None, scheduler_opts=None, submission_batch=None)

Create a subclass of Proc using another Proc subclass or Proc itself

Parameters
  • proc (Type) The Proc subclass
  • name (str, optional) The new name of the process
  • desc (str, optional) The new description of the process
  • envs (Mapping, optional) The arguments of the process, will overwrite parent oneThe items that are specified will be inherited
  • envs_depth (int, optional) How deep to update the envs when subclassed.
  • cache (bool, optional) Whether we should check the cache for the jobs
  • export (bool, optional) When True, the results will be exported to<pipeline.outdir> Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • error_strategy (str, optional) How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • num_retries (int, optional) How many times to retry to jobs once error occurs
  • forks (int, optional) New forks for the new process
  • input_data (Any, optional) The input data for the process. Only when this processis a start process
  • order (int, optional) The order to execute the new process
  • plugin_opts (Mapping, optional) The new plugin options, unspecified items will beinherited.
  • requires (Sequence, optional) The required processes for the new process
  • scheduler (str, optional) The new shedular to run the new process
  • scheduler_opts (Mapping, optional) The new scheduler options, unspecified items willbe inherited.
  • submission_batch (int, optional) How many jobs to be submited simultaneously
Returns (Type)

The new process class

classmethod

__init_subclass__()

Do the requirements inferring since we need them to build up theprocess relationship

method

init()

Init all other properties and jobs

method

gc()

GC process for the process to save memory after it's done

method

log(level, msg, *args, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the process

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • logger (LoggerAdapter, optional) The logging logger
method

run()

Run the process

class

biopipen.ns.misc.Glob2Dir(*args, **kwds)Proc

Bases
biopipen.core.proc.Proc pipen.proc.Proc

Create symbolic links in output directory for the files givenby the glob pattern

Attributes
  • cache Should we detect whether the jobs are cached?
  • desc The description of the process. Will use the summary fromthe docstring by default.
  • dirsig When checking the signature for caching, whether should we walkthrough the content of the directory? This is sometimes time-consuming if the directory is big.
  • envs The arguments that are job-independent, useful for common optionsacross jobs.
  • envs_depth How deep to update the envs when subclassed.
  • error_strategy How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • export When True, the results will be exported to <pipeline.outdir>Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • forks How many jobs to run simultaneously?
  • input The keys for the input channel
  • input_data The input data (will be computed for dependent processes)
  • lang The language for the script to run. Should be the path to theinterpreter if lang is not in $PATH.
  • name The name of the process. Will use the class name by default.
  • nexts Computed from requires to build the process relationships
  • num_retries How many times to retry to jobs once error occurs
  • order The execution order for this process. The bigger the numberis, the later the process will be executed. Default: 0. Note that the dependent processes will always be executed first. This doesn't work for start processes either, whose orders are determined by Pipen.set_starts()
  • output The output keys for the output channel(the data will be computed)
  • output_data The output data (to pass to the next processes)
  • plugin_opts Options for process-level plugins
  • requires The dependency processes
  • scheduler The scheduler to run the jobs
  • scheduler_opts The options for the scheduler
  • script The script template for the process
  • submission_batch How many jobs to be submited simultaneously
  • template Define the template engine to use.This could be either a template engine or a dict with key engine indicating the template engine and the rest the arguments passed to the constructor of the pipen.template.Template object. The template engine could be either the name of the engine, currently jinja2 and liquidpy are supported, or a subclass of pipen.template.Template. You can subclass pipen.template.Template to use your own template engine.
Classes
Methods
  • __init_subclass__() Do the requirements inferring since we need them to build up theprocess relationship </>
  • from_proc(proc, name, desc, envs, envs_depth, cache, export, error_strategy, num_retries, forks, input_data, order, plugin_opts, requires, scheduler, scheduler_opts, submission_batch) (Type) Create a subclass of Proc using another Proc subclass or Proc itself</>
  • gc() GC process for the process to save memory after it's done</>
  • init() Init all other properties and jobs</>
  • log(level, msg, *args, logger) Log message for the process</>
  • run() Run the process</>
class

pipen.proc.ProcMeta(name, bases, namespace, **kwargs)

Bases
abc.ABCMeta

Meta class for Proc

Methods
  • __call__(cls, *args, **kwds) (Proc) Make sure Proc subclasses are singletons</>
  • __instancecheck__(cls, instance) Override for isinstance(instance, cls).</>
  • __repr__(cls) (str) Representation for the Proc subclasses</>
  • __subclasscheck__(cls, subclass) Override for issubclass(subclass, cls).</>
  • register(cls, subclass) Register a virtual subclass of an ABC.</>
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

staticmethod
__repr__(cls) → str

Representation for the Proc subclasses

staticmethod
__call__(cls, *args, **kwds)

Make sure Proc subclasses are singletons

Parameters
  • *args (Any) and
  • **kwds (Any) Arguments for the constructor
Returns (Proc)

The Proc instance

classmethod

from_proc(proc, name=None, desc=None, envs=None, envs_depth=None, cache=None, export=None, error_strategy=None, num_retries=None, forks=None, input_data=None, order=None, plugin_opts=None, requires=None, scheduler=None, scheduler_opts=None, submission_batch=None)

Create a subclass of Proc using another Proc subclass or Proc itself

Parameters
  • proc (Type) The Proc subclass
  • name (str, optional) The new name of the process
  • desc (str, optional) The new description of the process
  • envs (Mapping, optional) The arguments of the process, will overwrite parent oneThe items that are specified will be inherited
  • envs_depth (int, optional) How deep to update the envs when subclassed.
  • cache (bool, optional) Whether we should check the cache for the jobs
  • export (bool, optional) When True, the results will be exported to<pipeline.outdir> Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • error_strategy (str, optional) How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • num_retries (int, optional) How many times to retry to jobs once error occurs
  • forks (int, optional) New forks for the new process
  • input_data (Any, optional) The input data for the process. Only when this processis a start process
  • order (int, optional) The order to execute the new process
  • plugin_opts (Mapping, optional) The new plugin options, unspecified items will beinherited.
  • requires (Sequence, optional) The required processes for the new process
  • scheduler (str, optional) The new shedular to run the new process
  • scheduler_opts (Mapping, optional) The new scheduler options, unspecified items willbe inherited.
  • submission_batch (int, optional) How many jobs to be submited simultaneously
Returns (Type)

The new process class

classmethod

__init_subclass__()

Do the requirements inferring since we need them to build up theprocess relationship

method

init()

Init all other properties and jobs

method

gc()

GC process for the process to save memory after it's done

method

log(level, msg, *args, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the process

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • logger (LoggerAdapter, optional) The logging logger
method

run()

Run the process

class

biopipen.ns.misc.Config2File(*args, **kwds)Proc

Bases
biopipen.core.proc.Proc pipen.proc.Proc

Write a configurationn in string to a configuration file

Requires python package rtoml

Attributes
  • cache Should we detect whether the jobs are cached?
  • desc The description of the process. Will use the summary fromthe docstring by default.
  • dirsig When checking the signature for caching, whether should we walkthrough the content of the directory? This is sometimes time-consuming if the directory is big.
  • envs The arguments that are job-independent, useful for common optionsacross jobs.
  • envs_depth How deep to update the envs when subclassed.
  • error_strategy How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • export When True, the results will be exported to <pipeline.outdir>Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • forks How many jobs to run simultaneously?
  • input The keys for the input channel
  • input_data The input data (will be computed for dependent processes)
  • lang The language for the script to run. Should be the path to theinterpreter if lang is not in $PATH.
  • name The name of the process. Will use the class name by default.
  • nexts Computed from requires to build the process relationships
  • num_retries How many times to retry to jobs once error occurs
  • order The execution order for this process. The bigger the numberis, the later the process will be executed. Default: 0. Note that the dependent processes will always be executed first. This doesn't work for start processes either, whose orders are determined by Pipen.set_starts()
  • output The output keys for the output channel(the data will be computed)
  • output_data The output data (to pass to the next processes)
  • plugin_opts Options for process-level plugins
  • requires The dependency processes
  • scheduler The scheduler to run the jobs
  • scheduler_opts The options for the scheduler
  • script The script template for the process
  • submission_batch How many jobs to be submited simultaneously
  • template Define the template engine to use.This could be either a template engine or a dict with key engine indicating the template engine and the rest the arguments passed to the constructor of the pipen.template.Template object. The template engine could be either the name of the engine, currently jinja2 and liquidpy are supported, or a subclass of pipen.template.Template. You can subclass pipen.template.Template to use your own template engine.
Input
  • config A string representation of configuration
  • name The name for output file.Will be config if not given
Output
  • outfile The output file with the configuration
Envs
  • infmt The input format. json or toml.
  • outfmt The output format. json or toml.
Classes
Methods
  • __init_subclass__() Do the requirements inferring since we need them to build up theprocess relationship </>
  • from_proc(proc, name, desc, envs, envs_depth, cache, export, error_strategy, num_retries, forks, input_data, order, plugin_opts, requires, scheduler, scheduler_opts, submission_batch) (Type) Create a subclass of Proc using another Proc subclass or Proc itself</>
  • gc() GC process for the process to save memory after it's done</>
  • init() Init all other properties and jobs</>
  • log(level, msg, *args, logger) Log message for the process</>
  • run() Run the process</>
class

pipen.proc.ProcMeta(name, bases, namespace, **kwargs)

Bases
abc.ABCMeta

Meta class for Proc

Methods
  • __call__(cls, *args, **kwds) (Proc) Make sure Proc subclasses are singletons</>
  • __instancecheck__(cls, instance) Override for isinstance(instance, cls).</>
  • __repr__(cls) (str) Representation for the Proc subclasses</>
  • __subclasscheck__(cls, subclass) Override for issubclass(subclass, cls).</>
  • register(cls, subclass) Register a virtual subclass of an ABC.</>
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

staticmethod
__repr__(cls) → str

Representation for the Proc subclasses

staticmethod
__call__(cls, *args, **kwds)

Make sure Proc subclasses are singletons

Parameters
  • *args (Any) and
  • **kwds (Any) Arguments for the constructor
Returns (Proc)

The Proc instance

classmethod

from_proc(proc, name=None, desc=None, envs=None, envs_depth=None, cache=None, export=None, error_strategy=None, num_retries=None, forks=None, input_data=None, order=None, plugin_opts=None, requires=None, scheduler=None, scheduler_opts=None, submission_batch=None)

Create a subclass of Proc using another Proc subclass or Proc itself

Parameters
  • proc (Type) The Proc subclass
  • name (str, optional) The new name of the process
  • desc (str, optional) The new description of the process
  • envs (Mapping, optional) The arguments of the process, will overwrite parent oneThe items that are specified will be inherited
  • envs_depth (int, optional) How deep to update the envs when subclassed.
  • cache (bool, optional) Whether we should check the cache for the jobs
  • export (bool, optional) When True, the results will be exported to<pipeline.outdir> Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • error_strategy (str, optional) How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • num_retries (int, optional) How many times to retry to jobs once error occurs
  • forks (int, optional) New forks for the new process
  • input_data (Any, optional) The input data for the process. Only when this processis a start process
  • order (int, optional) The order to execute the new process
  • plugin_opts (Mapping, optional) The new plugin options, unspecified items will beinherited.
  • requires (Sequence, optional) The required processes for the new process
  • scheduler (str, optional) The new shedular to run the new process
  • scheduler_opts (Mapping, optional) The new scheduler options, unspecified items willbe inherited.
  • submission_batch (int, optional) How many jobs to be submited simultaneously
Returns (Type)

The new process class

classmethod

__init_subclass__()

Do the requirements inferring since we need them to build up theprocess relationship

method

init()

Init all other properties and jobs

method

gc()

GC process for the process to save memory after it's done

method

log(level, msg, *args, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the process

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • logger (LoggerAdapter, optional) The logging logger
method

run()

Run the process

class

biopipen.ns.misc.Str2File(*args, **kwds)Proc

Bases
biopipen.core.proc.Proc pipen.proc.Proc

Write the given string to a file

Attributes
  • cache Should we detect whether the jobs are cached?
  • desc The description of the process. Will use the summary fromthe docstring by default.
  • dirsig When checking the signature for caching, whether should we walkthrough the content of the directory? This is sometimes time-consuming if the directory is big.
  • envs The arguments that are job-independent, useful for common optionsacross jobs.
  • envs_depth How deep to update the envs when subclassed.
  • error_strategy How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • export When True, the results will be exported to <pipeline.outdir>Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • forks How many jobs to run simultaneously?
  • input The keys for the input channel
  • input_data The input data (will be computed for dependent processes)
  • lang The language for the script to run. Should be the path to theinterpreter if lang is not in $PATH.
  • name The name of the process. Will use the class name by default.
  • nexts Computed from requires to build the process relationships
  • num_retries How many times to retry to jobs once error occurs
  • order The execution order for this process. The bigger the numberis, the later the process will be executed. Default: 0. Note that the dependent processes will always be executed first. This doesn't work for start processes either, whose orders are determined by Pipen.set_starts()
  • output The output keys for the output channel(the data will be computed)
  • output_data The output data (to pass to the next processes)
  • plugin_opts Options for process-level plugins
  • requires The dependency processes
  • scheduler The scheduler to run the jobs
  • scheduler_opts The options for the scheduler
  • script The script template for the process
  • submission_batch How many jobs to be submited simultaneously
  • template Define the template engine to use.This could be either a template engine or a dict with key engine indicating the template engine and the rest the arguments passed to the constructor of the pipen.template.Template object. The template engine could be either the name of the engine, currently jinja2 and liquidpy are supported, or a subclass of pipen.template.Template. You can subclass pipen.template.Template to use your own template engine.
Input
  • name The name of the fileIf not given, use envs.name
  • str The string to write to file
Output
  • outfile The output file
Envs
  • name The name of the output file
Classes
Methods
  • __init_subclass__() Do the requirements inferring since we need them to build up theprocess relationship </>
  • from_proc(proc, name, desc, envs, envs_depth, cache, export, error_strategy, num_retries, forks, input_data, order, plugin_opts, requires, scheduler, scheduler_opts, submission_batch) (Type) Create a subclass of Proc using another Proc subclass or Proc itself</>
  • gc() GC process for the process to save memory after it's done</>
  • init() Init all other properties and jobs</>
  • log(level, msg, *args, logger) Log message for the process</>
  • run() Run the process</>
class

pipen.proc.ProcMeta(name, bases, namespace, **kwargs)

Bases
abc.ABCMeta

Meta class for Proc

Methods
  • __call__(cls, *args, **kwds) (Proc) Make sure Proc subclasses are singletons</>
  • __instancecheck__(cls, instance) Override for isinstance(instance, cls).</>
  • __repr__(cls) (str) Representation for the Proc subclasses</>
  • __subclasscheck__(cls, subclass) Override for issubclass(subclass, cls).</>
  • register(cls, subclass) Register a virtual subclass of an ABC.</>
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

staticmethod
__repr__(cls) → str

Representation for the Proc subclasses

staticmethod
__call__(cls, *args, **kwds)

Make sure Proc subclasses are singletons

Parameters
  • *args (Any) and
  • **kwds (Any) Arguments for the constructor
Returns (Proc)

The Proc instance

classmethod

from_proc(proc, name=None, desc=None, envs=None, envs_depth=None, cache=None, export=None, error_strategy=None, num_retries=None, forks=None, input_data=None, order=None, plugin_opts=None, requires=None, scheduler=None, scheduler_opts=None, submission_batch=None)

Create a subclass of Proc using another Proc subclass or Proc itself

Parameters
  • proc (Type) The Proc subclass
  • name (str, optional) The new name of the process
  • desc (str, optional) The new description of the process
  • envs (Mapping, optional) The arguments of the process, will overwrite parent oneThe items that are specified will be inherited
  • envs_depth (int, optional) How deep to update the envs when subclassed.
  • cache (bool, optional) Whether we should check the cache for the jobs
  • export (bool, optional) When True, the results will be exported to<pipeline.outdir> Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • error_strategy (str, optional) How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • num_retries (int, optional) How many times to retry to jobs once error occurs
  • forks (int, optional) New forks for the new process
  • input_data (Any, optional) The input data for the process. Only when this processis a start process
  • order (int, optional) The order to execute the new process
  • plugin_opts (Mapping, optional) The new plugin options, unspecified items will beinherited.
  • requires (Sequence, optional) The required processes for the new process
  • scheduler (str, optional) The new shedular to run the new process
  • scheduler_opts (Mapping, optional) The new scheduler options, unspecified items willbe inherited.
  • submission_batch (int, optional) How many jobs to be submited simultaneously
Returns (Type)

The new process class

classmethod

__init_subclass__()

Do the requirements inferring since we need them to build up theprocess relationship

method

init()

Init all other properties and jobs

method

gc()

GC process for the process to save memory after it's done

method

log(level, msg, *args, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the process

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • logger (LoggerAdapter, optional) The logging logger
method

run()

Run the process

class

biopipen.ns.misc.Shell(*args, **kwds)Proc

Bases
biopipen.core.proc.Proc pipen.proc.Proc

Run a shell command

Attributes
  • cache Should we detect whether the jobs are cached?
  • desc The description of the process. Will use the summary fromthe docstring by default.
  • dirsig When checking the signature for caching, whether should we walkthrough the content of the directory? This is sometimes time-consuming if the directory is big.
  • envs The arguments that are job-independent, useful for common optionsacross jobs.
  • envs_depth How deep to update the envs when subclassed.
  • error_strategy How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • export When True, the results will be exported to <pipeline.outdir>Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • forks How many jobs to run simultaneously?
  • input The keys for the input channel
  • input_data The input data (will be computed for dependent processes)
  • lang The language for the script to run. Should be the path to theinterpreter if lang is not in $PATH.
  • name The name of the process. Will use the class name by default.
  • nexts Computed from requires to build the process relationships
  • num_retries How many times to retry to jobs once error occurs
  • order The execution order for this process. The bigger the numberis, the later the process will be executed. Default: 0. Note that the dependent processes will always be executed first. This doesn't work for start processes either, whose orders are determined by Pipen.set_starts()
  • output The output keys for the output channel(the data will be computed)
  • output_data The output data (to pass to the next processes)
  • plugin_opts Options for process-level plugins
  • requires The dependency processes
  • scheduler The scheduler to run the jobs
  • scheduler_opts The options for the scheduler
  • script The script template for the process
  • submission_batch How many jobs to be submited simultaneously
  • template Define the template engine to use.This could be either a template engine or a dict with key engine indicating the template engine and the rest the arguments passed to the constructor of the pipen.template.Template object. The template engine could be either the name of the engine, currently jinja2 and liquidpy are supported, or a subclass of pipen.template.Template. You can subclass pipen.template.Template to use your own template engine.
Input
  • infile The input file
Output
  • outfile The output file
Envs
  • cmd The shell command to runUse $infile and $outfile to refer to input and output files
  • outdir Whether the out.outfile should be a directory.If so a directory will be created before running the command.
Classes
Methods
  • __init_subclass__() Do the requirements inferring since we need them to build up theprocess relationship </>
  • from_proc(proc, name, desc, envs, envs_depth, cache, export, error_strategy, num_retries, forks, input_data, order, plugin_opts, requires, scheduler, scheduler_opts, submission_batch) (Type) Create a subclass of Proc using another Proc subclass or Proc itself</>
  • gc() GC process for the process to save memory after it's done</>
  • init() Init all other properties and jobs</>
  • log(level, msg, *args, logger) Log message for the process</>
  • run() Run the process</>
class

pipen.proc.ProcMeta(name, bases, namespace, **kwargs)

Bases
abc.ABCMeta

Meta class for Proc

Methods
  • __call__(cls, *args, **kwds) (Proc) Make sure Proc subclasses are singletons</>
  • __instancecheck__(cls, instance) Override for isinstance(instance, cls).</>
  • __repr__(cls) (str) Representation for the Proc subclasses</>
  • __subclasscheck__(cls, subclass) Override for issubclass(subclass, cls).</>
  • register(cls, subclass) Register a virtual subclass of an ABC.</>
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

staticmethod
__repr__(cls) → str

Representation for the Proc subclasses

staticmethod
__call__(cls, *args, **kwds)

Make sure Proc subclasses are singletons

Parameters
  • *args (Any) and
  • **kwds (Any) Arguments for the constructor
Returns (Proc)

The Proc instance

classmethod

from_proc(proc, name=None, desc=None, envs=None, envs_depth=None, cache=None, export=None, error_strategy=None, num_retries=None, forks=None, input_data=None, order=None, plugin_opts=None, requires=None, scheduler=None, scheduler_opts=None, submission_batch=None)

Create a subclass of Proc using another Proc subclass or Proc itself

Parameters
  • proc (Type) The Proc subclass
  • name (str, optional) The new name of the process
  • desc (str, optional) The new description of the process
  • envs (Mapping, optional) The arguments of the process, will overwrite parent oneThe items that are specified will be inherited
  • envs_depth (int, optional) How deep to update the envs when subclassed.
  • cache (bool, optional) Whether we should check the cache for the jobs
  • export (bool, optional) When True, the results will be exported to<pipeline.outdir> Defaults to None, meaning only end processes will export. You can set it to True/False to enable or disable exporting for processes
  • error_strategy (str, optional) How to deal with the errors
    • - retry, ignore, halt
    • - halt to halt the whole pipeline, no submitting new jobs
    • - terminate to just terminate the job itself
  • num_retries (int, optional) How many times to retry to jobs once error occurs
  • forks (int, optional) New forks for the new process
  • input_data (Any, optional) The input data for the process. Only when this processis a start process
  • order (int, optional) The order to execute the new process
  • plugin_opts (Mapping, optional) The new plugin options, unspecified items will beinherited.
  • requires (Sequence, optional) The required processes for the new process
  • scheduler (str, optional) The new shedular to run the new process
  • scheduler_opts (Mapping, optional) The new scheduler options, unspecified items willbe inherited.
  • submission_batch (int, optional) How many jobs to be submited simultaneously
Returns (Type)

The new process class

classmethod

__init_subclass__()

Do the requirements inferring since we need them to build up theprocess relationship

method

init()

Init all other properties and jobs

method

gc()

GC process for the process to save memory after it's done

method

log(level, msg, *args, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the process

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • logger (LoggerAdapter, optional) The logging logger
method

run()

Run the process