Skip to content

pipen.scheduler

module

pipen.scheduler

Provide builting schedulers

Classes
Functions
  • get_scheduler(scheduler) (Type) Get the scheduler by name of the scheduler class itself</>
class

pipen.scheduler.LocalJob(*args, **kwargs)

Bases
xqute.schedulers.local_scheduler.LocalJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for local scheduler

Attributes
  • CMD_WRAPPER_SHELL The shell to run the wrapped script
  • CMD_WRAPPER_TEMPLATE The template for job wrapping
  • _error_retry Whether we should retry if error happened
  • _num_retries Total number of retries
  • _rc The return code of the job
  • _status The status of the job
  • _wrapped_cmd The wrapped cmd, used for job submission
  • cached Check if a job is cached</>
  • cmd The command
  • hook_done Mark whether hooks have already been. Since we don't havea trigger for job finished/failed, so we do a polling on it. This is to avoid calling the hooks repeatedly
  • index The index of the job
  • jid The jid of the job in scheduler system
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • metadir The metadir of the job
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
  • trial_count The count for re-tries
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) The shebang of the wrapped script</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

cache()

write signature to signature file

class

abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method

__repr__() → str

repr of the job

method

shebang(scheduler) → str

The shebang of the wrapped script

method

clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method

wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method

log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method

prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
class

pipen.scheduler.LocalScheduler(forks, prescript='', postscript='', **kwargs)

Bases
xqute.schedulers.local_scheduler.LocalScheduler xqute.scheduler.Scheduler

Local scheduler

Parameters
  • forks (int) Max number of job forks
  • prescript (str, optional) The script to run before the command
  • postscript (str, optional) The script to run after the command
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • name The name of the scheduler
Classes
Methods
method

submit_job_and_update_status(job)

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

kill_job_and_update_status(job)

Kill a job and update its status

Parameters
  • job (Job) The job
method

polling_jobs(jobs, on, halt_on_error)

Check if all jobs are done or new jobs can submit

Parameters
  • jobs (List) The list of jobs
  • on (str) query on status: can_submit or all_done
  • halt_on_error (bool) Whether we should halt the whole pipeline on error
Returns (bool)

True if yes otherwise False.

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs
method

job_is_submitted_or_running(job)

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

submit_job(job)

Submit a job locally

Parameters
  • job (Job) The job
Returns (int)

The process id

method

kill_job(job)

Kill a job asynchronously

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.LocalJob(*args, **kwargs)

Bases
xqute.schedulers.local_scheduler.LocalJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for local scheduler

Attributes
  • cached Check if a job is cached</>
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) The shebang of the wrapped script</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

write signature to signature file

class
abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method
__repr__() → str

repr of the job

method
shebang(scheduler) → str

The shebang of the wrapped script

method
clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method
wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method
log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method
prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
class

pipen.scheduler.SgeJob(*args, **kwargs)

Bases
xqute.schedulers.sge_scheduler.SgeJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for SGE scheduler

Attributes
  • CMD_WRAPPER_SHELL The shell to run the wrapped script
  • CMD_WRAPPER_TEMPLATE The template for job wrapping
  • _error_retry Whether we should retry if error happened
  • _num_retries Total number of retries
  • _rc The return code of the job
  • _status The status of the job
  • _wrapped_cmd The wrapped cmd, used for job submission
  • cached Check if a job is cached</>
  • cmd The command
  • hook_done Mark whether hooks have already been. Since we don't havea trigger for job finished/failed, so we do a polling on it. This is to avoid calling the hooks repeatedly
  • index The index of the job
  • jid The jid of the job in scheduler system
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • metadir The metadir of the job
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
  • trial_count The count for re-tries
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) Make the shebang with options</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

cache()

write signature to signature file

class

abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method

__repr__() → str

repr of the job

method

clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method

wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method

log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method

prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
method

shebang(scheduler)

Make the shebang with options

Parameters
  • scheduler (Scheduler) The scheduler
Returns (str)

The shebang with options

class

pipen.scheduler.SgeScheduler(*args, **kwargs)

Bases
xqute.schedulers.sge_scheduler.SgeScheduler xqute.scheduler.Scheduler

SGE scheduler

Parameters
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • name The name of the scheduler
Classes
Methods
method

submit_job_and_update_status(job)

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

kill_job_and_update_status(job)

Kill a job and update its status

Parameters
  • job (Job) The job
method

polling_jobs(jobs, on, halt_on_error)

Check if all jobs are done or new jobs can submit

Parameters
  • jobs (List) The list of jobs
  • on (str) query on status: can_submit or all_done
  • halt_on_error (bool) Whether we should halt the whole pipeline on error
Returns (bool)

True if yes otherwise False.

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs
method

job_is_submitted_or_running(job)

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

submit_job(job)

Submit a job to SGE

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on SGE

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SgeJob(*args, **kwargs)

Bases
xqute.schedulers.sge_scheduler.SgeJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for SGE scheduler

Attributes
  • cached Check if a job is cached</>
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) Make the shebang with options</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

write signature to signature file

class
abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method
__repr__() → str

repr of the job

method
clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method
wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method
log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method
prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
method
shebang(scheduler)

Make the shebang with options

Parameters
  • scheduler (Scheduler) The scheduler
Returns (str)

The shebang with options

class

pipen.scheduler.SlurmJob(*args, **kwargs)

Bases
xqute.schedulers.slurm_scheduler.SlurmJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for Slurm scheduler

Attributes
  • CMD_WRAPPER_SHELL The shell to run the wrapped script
  • CMD_WRAPPER_TEMPLATE The template for job wrapping
  • _error_retry Whether we should retry if error happened
  • _num_retries Total number of retries
  • _rc The return code of the job
  • _status The status of the job
  • _wrapped_cmd The wrapped cmd, used for job submission
  • cached Check if a job is cached</>
  • cmd The command
  • hook_done Mark whether hooks have already been. Since we don't havea trigger for job finished/failed, so we do a polling on it. This is to avoid calling the hooks repeatedly
  • index The index of the job
  • jid The jid of the job in scheduler system
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • metadir The metadir of the job
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
  • trial_count The count for re-tries
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) Make the shebang with options</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

cache()

write signature to signature file

class

abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method

__repr__() → str

repr of the job

method

clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method

wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method

log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method

prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
method

shebang(scheduler)

Make the shebang with options

Parameters
  • scheduler (Scheduler) The scheduler
Returns (str)

The shebang with options

class

pipen.scheduler.SlurmScheduler(*args, **kwargs)

Bases
xqute.schedulers.slurm_scheduler.SlurmScheduler xqute.scheduler.Scheduler

Slurm scheduler

Parameters
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • name The name of the scheduler
Classes
Methods
method

submit_job_and_update_status(job)

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

kill_job_and_update_status(job)

Kill a job and update its status

Parameters
  • job (Job) The job
method

polling_jobs(jobs, on, halt_on_error)

Check if all jobs are done or new jobs can submit

Parameters
  • jobs (List) The list of jobs
  • on (str) query on status: can_submit or all_done
  • halt_on_error (bool) Whether we should halt the whole pipeline on error
Returns (bool)

True if yes otherwise False.

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs
method

job_is_submitted_or_running(job)

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

submit_job(job)

Submit a job to Slurm

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on Slurm

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SlurmJob(*args, **kwargs)

Bases
xqute.schedulers.slurm_scheduler.SlurmJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for Slurm scheduler

Attributes
  • cached Check if a job is cached</>
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) Make the shebang with options</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

write signature to signature file

class
abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method
__repr__() → str

repr of the job

method
clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method
wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method
log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method
prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
method
shebang(scheduler)

Make the shebang with options

Parameters
  • scheduler (Scheduler) The scheduler
Returns (str)

The shebang with options

class

pipen.scheduler.SshJob(*args, **kwargs)

Bases
xqute.schedulers.ssh_scheduler.scheduler.SshJob xqute.schedulers.local_scheduler.LocalJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for SSH scheduler

Attributes
  • CMD_WRAPPER_SHELL The shell to run the wrapped script
  • CMD_WRAPPER_TEMPLATE The template for job wrapping
  • _error_retry Whether we should retry if error happened
  • _num_retries Total number of retries
  • _rc The return code of the job
  • _status The status of the job
  • _wrapped_cmd The wrapped cmd, used for job submission
  • cached Check if a job is cached</>
  • cmd The command
  • hook_done Mark whether hooks have already been. Since we don't havea trigger for job finished/failed, so we do a polling on it. This is to avoid calling the hooks repeatedly
  • index The index of the job
  • jid The jid of the job in scheduler system
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • metadir The metadir of the job
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
  • trial_count The count for re-tries
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) The shebang of the wrapped script</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

cache()

write signature to signature file

class

abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method

__repr__() → str

repr of the job

method

shebang(scheduler) → str

The shebang of the wrapped script

method

clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method

wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method

log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method

prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
class

pipen.scheduler.SshScheduler(forks, prescript='', postscript='', **kwargs)

Bases
xqute.schedulers.ssh_scheduler.scheduler.SshScheduler xqute.scheduler.Scheduler

SSH scheduler

Parameters
  • forks (int) Max number of job forks
  • prescript (str, optional) The script to run before the command
  • postscript (str, optional) The script to run after the command
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • name The name of the scheduler
Classes
Methods
method

submit_job_and_update_status(job)

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

kill_job_and_update_status(job)

Kill a job and update its status

Parameters
  • job (Job) The job
method

polling_jobs(jobs, on, halt_on_error)

Check if all jobs are done or new jobs can submit

Parameters
  • jobs (List) The list of jobs
  • on (str) query on status: can_submit or all_done
  • halt_on_error (bool) Whether we should halt the whole pipeline on error
Returns (bool)

True if yes otherwise False.

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs
method

job_is_submitted_or_running(job)

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

submit_job(job)

Submit a job to SSH

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on SSH

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SshJob(*args, **kwargs)

Bases
xqute.schedulers.ssh_scheduler.scheduler.SshJob xqute.schedulers.local_scheduler.LocalJob pipen.job.Job xqute.job.Job abc.ABC pipen._job_caching.JobCaching

Job class for SSH scheduler

Attributes
  • cached Check if a job is cached</>
  • jid (int | str | none) Get the jid of the job in scheduler system</>
  • jid_file (Path) The jid file of the job</>
  • rc (int) The return code of the job</>
  • rc_file (Path) The rc file of the job</>
  • retry_dir (Path) The retry directory of the job</>
  • script_file Get the path to script file</>
  • signature_file Get the path to the signature file</>
  • status (int) Query the status of the job
    If the job is submitted, try to query it from the status file Make sure the status is updated by trap in wrapped script </>
  • status_file (Path) The status file of the job</>
  • stderr_file (Path) The stderr file of the job</>
  • stdout_file (Path) The stdout file of the job</>
  • strcmd (str) Get the string representation of the command</>
Classes
  • ABCMeta Metaclass for defining Abstract Base Classes (ABCs).</>
Methods
  • __repr__() (str) repr of the job</>
  • cache() write signature to signature file</>
  • clean(retry) Clean up the meta files</>
  • log(level, msg, *args, limit, limit_indicator, logger) Log message for the jobs</>
  • prepare(proc) Prepare the job by given process</>
  • shebang(scheduler) (str) The shebang of the wrapped script</>
  • wrapped_script(scheduler) (PathLike) Get the wrapped script</>
method

write signature to signature file

class
abc.ABCMeta(name, bases, namespace, **kwargs)

Metaclass for defining Abstract Base Classes (ABCs).

Use this metaclass to create an ABC. An ABC can be subclassed directly, and then acts as a mix-in class. You can also register unrelated concrete classes (even built-in classes) and unrelated ABCs as 'virtual subclasses' -- these and their descendants will be considered subclasses of the registering ABC by the built-in issubclass() function, but the registering ABC won't show up in their MRO (Method Resolution Order) nor will method implementations defined by the registering ABC be callable (not even via super()).

Methods
staticmethod
register(cls, subclass)

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

staticmethod
__instancecheck__(cls, instance)

Override for isinstance(instance, cls).

staticmethod
__subclasscheck__(cls, subclass)

Override for issubclass(subclass, cls).

method
__repr__() → str

repr of the job

method
shebang(scheduler) → str

The shebang of the wrapped script

method
clean(retry=False)

Clean up the meta files

Parameters
  • retry (optional) Whether clean it for retrying
method
wrapped_script(scheduler)

Get the wrapped script

Parameters
  • scheduler (Scheduler) The scheduler
Returns (PathLike)

The path of the wrapped script

method
log(level, msg, *args, limit=3, limit_indicator=True, logger=<LoggerAdapter pipen.core (WARNING)>)

Log message for the jobs

Parameters
  • level (int | str) The log level of the record
  • msg (str) The message to log
  • *args The arguments to format the message
  • limit (int, optional) limitation of the log (don't log for all jobs)
  • limit_indicator (bool, optional) Whether to show an indicator saying the loghas been limited (the level of the indicator will be DEBUG)
  • logger (LoggerAdapter, optional) The logger used to log
method
prepare(proc)

Prepare the job by given process

Primarily prepare the script, and provide cmd to the job for xqute to wrap and run

Parameters
  • proc (Proc) the process object
function

pipen.scheduler.get_scheduler(scheduler)

Get the scheduler by name of the scheduler class itself

Parameters
  • scheduler (Union) The scheduler class or name
Returns (Type)

The scheduler class