Skip to content

xqute.xqute

module

xqute.xqute

The xqute module

Classes
  • Xqute The main class of the package</>
class

xqute.xqute.Xqute(scheduler='local', plugins=None, workdir='./.xqute', submission_batch=None, error_strategy='ignore', num_retries=3, forks=1, scheduler_opts=None, jobname_prefix=None)

The main class of the package

Attributes
  • EMPTY_BUFFER_SLEEP_TIME The time to sleep while waiting whenthe buffer is empty to wait for the jobs to be pushed
  • _cancelling A mark to mark whether a shutting down eventis triggered (True for natural cancelling, the signal for cancelling with a signal, SIGINT for example)
  • buffer_queue A buffer queue to save the pushed jobs
  • jobs The jobs registry
  • name The name, used in logger
  • plugins The plugins to be enabled or disabledto disable a plugin, using -plugin_name either all plugin names should be prefixed with '+'/'-' or none of them should
  • queue The job queue
  • scheduler The scheduler
  • task The task of producer and consumers
Parameters
  • scheduler (str | Type[Scheduler], optional) The scheduler class or name
  • plugins (Optional, optional) The plugins to be enabled or disabledto disable a plugin, using -plugin_name either all plugin names should be prefixed with '+'/'-' or none of them should
  • workdir (str | PathType, optional) The job meta directory
  • submission_batch (int | none, optional) The number of consumers to submit jobs. This allowsmultiple jobs to be submitted in parallel. This is useful when there are many jobs to be submitted and the scheduler has a high latency for each submission. Set this to a smaller number if the scheduler cannot handle too many simultaneous submissions.
  • error_strategy (str, optional) The strategy when there is error happened
  • num_retries (int, optional) Max number of retries when error_strategy is retry
  • forks (int, optional) Max number of job forks for scheduler
  • scheduler_opts (Optional, optional) Additional keyword arguments for scheduler
Methods
  • __del__() Destructor to warn if stop_feeding was not called</>
  • cancel(sig) Cancel the producer-consumer task</>
  • feed(cmd, envs) Put a command into the buffer</>
  • is_feeding() (bool) Check if the system is in keep_feeding mode.</>
  • run_until_complete(keep_feeding) Wait until all jobs complete</>
  • stop_feeding() Stop feeding mode and wait for all jobs to complete.</>
method

__del__()

Destructor to warn if stop_feeding was not called

method

cancel(sig=None)

Cancel the producer-consumer task

self._cancelling will be set to signaled if sig is provided, otherwise it will be set to True

Parameters
  • sig (signal.signals | none, optional) Whether this cancelling is caused by a signal
method

feed(cmd, envs=None)

Put a command into the buffer

Parameters
  • cmd (CommandType | Job) The command
  • envs (dict, optional) The environment variables for the job
method

is_feeding()

Check if the system is in keep_feeding mode.

Returns (bool)

True if in keep_feeding mode and waiting for stop_feeding() to be called.

method

stop_feeding()

Stop feeding mode and wait for all jobs to complete.

After calling this method, the producer will exit once the buffer queue is empty, and this method will wait for all jobs to complete. This should be called after all jobs have been submitted when using run_until_complete(keep_feeding=True).

Raises
  • RuntimeError If called without first callingrun_until_complete(keep_feeding=True)
method

run_until_complete(keep_feeding=False)

Wait until all jobs complete

Parameters
  • keep_feeding (bool, optional) If True, starts running in background and returns immediately,allowing jobs to be added after calling this method. You must call stop_feeding() when done adding jobs, which will wait for all jobs to complete. If False (default), waits for all current jobs to complete immediately.
Examples

Traditional usage:

xqute = Xqute()
await xqute.feed(['echo', '1'])
await xqute.feed(['echo', '2'])
await xqute.run_until_complete()

Keep feeding mode:

xqute = Xqute()
await xqute.feed(['echo', '1'])
await xqute.run_until_complete(keep_feeding=True)  # Returns immediately
await xqute.feed(['echo', '2'])  # Can add more jobs
await xqute.stop_feeding()  # Waits for completion