module

pipen.scheduler

Provide builting schedulers

Classes
Functions
  • get_scheduler(scheduler) (Type) Get the scheduler by name of the scheduler class itself</>
class

pipen.scheduler.SchedulerPostInit()

Provides post init function for all schedulers

class

pipen.scheduler.LocalScheduler(workdir, forks=1, error_strategy='ignore', num_retries=3, prescript='', postscript='', jobname_prefix=None, submission_batch=None, recheck_interval=600, cwd=None, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.local_scheduler.LocalScheduler xqute.scheduler.Scheduler

Local scheduler

Parameters
  • workdir (str | Path | CloudPath) The working directory
  • forks (int, optional) Max number of job forks
  • error_strategy (str, optional) The strategy when there is error happened
  • num_retries (int, optional) Max number of retries when error_strategy is retry
  • prescript (str, optional) The prescript to run before the job commandIt is a piece of script that inserted into the wrapper script, running on the scheduler system.
  • postscript (str, optional) The postscript to run when job finishedIt is a piece of script that inserted into the wrapper script, running on the scheduler system.
  • jobname_prefix (str | none, optional) The prefix for the job name
  • submission_batch (int | none, optional) The number of consumers to submit jobs. This allowsmultiple jobs to be submitted in parallel. This is useful when there are many jobs to be submitted and the scheduler has a high latency for each submission. Set this to a smaller number if the scheduler cannot handle too many simultaneous submissions.
  • recheck_interval (int, optional) The number of polling iterations between rechecks ofwhether a job is still running on the scheduler. Helps detect jobs that fail before the wrapped script updates status (e.g., resource allocation failures). Each iteration takes ~0.1s, so default 600 means rechecking every ~60 seconds.
  • cwd (str | Path, optional) The working directory for the job command wrapper
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job (Job) The job to check
Returns (bool)

True if the job fails before running, otherwise False.

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

jobcmd_init(job) → str

The job command init

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

submit_job(job, _mounted=False)

Submit a job locally

Parameters
  • job (Job) The job
  • _mounted (bool, optional) Whether to use the mounted path of the wrapped job scriptUsed internally for container scheduler
Returns (int)

The process id

method

kill_job(job)

Kill a job asynchronously

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SgeScheduler(*args, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.sge_scheduler.SgeScheduler xqute.scheduler.Scheduler

SGE scheduler

Parameters
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job (Job) The job to check
Returns (bool)

True if the job fails before running, otherwise False.

method

jobcmd_init(job) → str

The job command init

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

submit_job(job)

Submit a job to SGE

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on SGE

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SlurmScheduler(*args, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.slurm_scheduler.SlurmScheduler xqute.scheduler.Scheduler

Slurm scheduler

Parameters
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

jobcmd_init(job) → str

The job command init

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

submit_job(job)

Submit a job to Slurm

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on Slurm

Parameters
  • job (Job) The job

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job The job to check
Returns

True if the job fails before running, otherwise False.

method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.SshScheduler(*args, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.ssh_scheduler.scheduler.SshScheduler xqute.scheduler.Scheduler

SSH scheduler

Parameters
  • **kwargs Other arguments for the scheduler
Attributes
  • job_class The job class
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job (Job) The job to check
Returns (bool)

True if the job fails before running, otherwise False.

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

jobcmd_init(job) → str

The job command init

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

submit_job(job)

Submit a job to SSH

Parameters
  • job (Job) The job
Returns (str)

The job id

method

kill_job(job)

Kill a job on SSH

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

class

pipen.scheduler.GbatchScheduler(*args, project, location, mount=None, service_account=None, network=None, subnetwork=None, no_external_ip_address=None, machine_type=None, provisioning_model=None, image_uri=None, entrypoint=None, commands=None, runnables=None, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.gbatch_scheduler.GbatchScheduler xqute.scheduler.Scheduler

Google Cloud Batch scheduler

Parameters
  • **kwargs Keyword arguments for the configuration of a job (e.g. taskGroups).See more details at https://cloud.google.com/batch/docs/get-started.
  • *args Positional arguments for the base class
  • project (str) Google Cloud project ID
  • location (str) Google Cloud region or zone
  • mount (str | Sequence[str] | None, optional) GCS path to mount (e.g. gs://my-bucket:/mnt/my-bucket)You can pass a list of mounts.
  • service_account (str | none, optional) GCP service account email (e.g. test-account@example.com)
  • network (str | none, optional) GCP network (e.g. default-network)
  • subnetwork (str | none, optional) GCP subnetwork (e.g. regions/us-central1/subnetworks/default)
  • no_external_ip_address (bool | none, optional) Whether to disable external IP address
  • machine_type (str | none, optional) GCP machine type (e.g. e2-standard-4)
  • provisioning_model (str | none, optional) GCP provisioning model (e.g. SPOT)
  • image_uri (str | none, optional) Container image URI (e.g. ubuntu-2004-lts)
  • entrypoint (str, optional) Container entrypoint (e.g. /bin/bash)
  • commands (str | Sequence[str] | None, optional) The command list to run in the container.There are three ways to specify the commands: 1. If no entrypoint is specified, the final command will be [commands, wrapped_script], where the entrypoint is the wrapper script interpreter that is determined by JOBCMD_WRAPPER_LANG (e.g. /bin/bash), commands is the list you provided, and wrapped_script is the path to the wrapped job script. 2. You can specify something like "-c", then the final command will be ["-c", "wrapper_script_interpreter, wrapper_script"] 3. You can use the placeholders {lang} and {script} in the commands list, where {lang} will be replaced with the interpreter (e.g. /bin/bash) and {script} will be replaced with the path to the wrapped job script. For example, you can specify ["{lang} {script}"] and the final command will be ["wrapper_interpreter, wrapper_script"]
  • runnables (Sequence[dict] | None, optional) Additional runnables to run before or after the main job.Each runnable should be a dictionary that follows the GCP Batch API specification. You can also specify an "order" key in the dictionary to control the execution order of the runnables. Runnables with negative order will be executed before the main job, and those with non-negative order will be executed after the main job. The main job runnable will always be executed in the order it is defined in the list.
Attributes
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

submit_job(job)

Submit a job

Parameters
  • job (Job) The job
Returns (str)

The unique id in the scheduler system

method

kill_job(job)

Kill a job

Parameters
  • job (Job) The job

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job (Job) The job to check
Returns (bool)

True if the job fails before running, otherwise False.

method

job_is_running(job)

Check if a job is really running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

method

jobcmd_init(job) → str

The job command init

class

pipen.scheduler.ContainerScheduler(image, entrypoint='/bin/bash', bin='docker', volumes=None, remove=True, user=None, bin_args=None, **kwargs)

Bases
pipen.scheduler.SchedulerPostInit xqute.schedulers.container_scheduler.ContainerScheduler xqute.schedulers.local_scheduler.LocalScheduler xqute.scheduler.Scheduler

Scheduler to run jobs via containers (Docker/Podman/Apptainer)

Parameters
  • **kwargs Additional arguments passed to parent Scheduler
  • image (str) Container image to use for running jobs
  • entrypoint (str | List[str], optional) Entrypoint command for the container
  • bin (str, optional) Path to container runtime binary (e.g. /path/to/docker)
  • volumes (str | Sequence[str] | None, optional) host:container volume mapping string or stringsor named volume mapping like MOUNTED=/path/on/host then it will be mounted to /mnt/disks/MOUNTED in the container. You can use environment variable MOUNTED in your job scripts to refer to the mounted path.
  • remove (bool, optional) Whether to remove the container after execution.Only applies to Docker/Podman.
  • user (str | none, optional) User to run the container as (only for Docker/Podman)By default, it runs as the current user (os.getuid() and os.getgid())
  • bin_args (List[str] | None, optional) Additional arguments to pass to the container runtime
Attributes
  • job_class The job class
  • jobcmd_wrapper_init The init script for the job command wrapper
  • jobcmd_wrapper_init (str) The init script for the job command wrapper</>
  • name The name of the scheduler
Methods
method

create_job(index, cmd, envs=None)

Create a job

Parameters
  • index (int) The index of the job
  • cmd (Union) The command of the job
Returns (Job)

The job

Submit and update the status

  1. Check if the job is already submitted or running
  2. If not, run the hook
  3. If the hook is not cancelled, clean the job
  4. Submit the job, raising an exception if it fails
  5. If the job is submitted successfully, update the status
  6. If the job fails to submit, update the status and write stderr to the job file
Parameters
  • job (Job) The job
method

retry_job(job)

Retry a job

Parameters
  • job (Job) The job
method

transition_job_status(job, new_status, rc=None, error_msg=None, is_killed=False)

Centralized status transition handler

Handles all aspects of job status transitions:

  • - Status change logging
  • - Hook lifecycle management (ensuring on_job_started is called)
  • - Appropriate hook calls based on new status
  • - RC file updates
  • - Error message appending to stderr
  • - JID file cleanup for terminal states
  • - Pipeline halt on errors if configured

Parameters
  • job (Job) The job to transition
  • new_status (int) The new status to transition to
  • rc (str | none, optional) Optional return code to write to rc_file
  • error_msg (str | none, optional) Optional error message to append to stderr_file
  • is_killed (bool, optional) Whether this is a killed job (uses on_job_killed hook)

Kill a job and update its status

Parameters
  • job (Job) The job
method

count_running_jobs(jobs)

Count currently running/active jobs (lightweight check)

This is optimized for the producer to check if new jobs can be submitted. It only counts jobs without refreshing status or calling hooks.

Parameters
  • jobs (List) The list of jobs
Returns (int)

Number of jobs currently in active states

method

check_all_done(jobs, polling_counter)

Check if all jobs are done (full polling with hooks)

This does complete status refresh and calls all lifecycle hooks. Used by the main polling loop to track job completion.

Parameters
  • jobs (List) The list of jobs
  • polling_counter (int) The polling counter for hook calls
Returns (bool)

True if all jobs are done, False otherwise

method

kill_running_jobs(jobs)

Try to kill all running jobs

Parameters
  • jobs (List) The list of jobs

Check if a job is already submitted or running

Parameters
  • job (Job) The job
Returns (bool)

True if yes otherwise False.

Check if a job fails before running.

For some schedulers, the job might fail before running (after submission). For example, the job might fail to allocate resources. In such a case, the wrapped script might not be executed, and the job status will not be updated (stays in SUBMITTED). We need to check such jobs and mark them as FAILED.

For the instant scheduler, for example, the local scheduler, the failure will be immediately reported when submitting the job, so we don't need to check such jobs.

Parameters
  • job (Job) The job to check
Returns (bool)

True if the job fails before running, otherwise False.

method

jobcmd_prep(job) → str

The job command preparation

method

jobcmd_end(job) → str

The job command end

method

wrap_job_script(job)

Wrap the job script

Parameters
  • job (Job) The job
Returns (str)

The wrapped script

method

kill_job(job)

Kill a job asynchronously

Parameters
  • job (Job) The job
method

job_is_running(job)

Tell if a job is really running, not only the job.jid_file

In case where the jid file is not cleaned when job is done.

Parameters
  • job (Job) The job
Returns (bool)

True if it is, otherwise False

method

wrapped_job_script(job)

Get the wrapped job script

Parameters
  • job (Job) The job
Returns (SpecPath)

The path of the wrapped job script

method

jobcmd_shebang(job) → str

The shebang of the wrapper script

method

submit_job(job)

Submit a job locally

Parameters
  • job (Job) The job
Returns (int)

The process id

method

jobcmd_init(job) → str

The job command init

function

pipen.scheduler.get_scheduler(scheduler)

Get the scheduler by name of the scheduler class itself

Parameters
  • scheduler (Union) The scheduler class or name
Returns (Type)

The scheduler class