xqute.xqute
class
xqute.xqute.Xqute(scheduler='local', plugins=None, workdir='./.xqute', submission_batch=None, error_strategy='ignore', num_retries=3, forks=1, scheduler_opts=None, jobname_prefix=None)
The main class of the package
Attributes
EMPTY_BUFFER_SLEEP_TIME— The time to sleep while waiting whenthe buffer is empty to wait for the jobs to be pushed_cancelling— A mark to mark whether a shutting down eventis triggered (True for natural cancelling, the signal for cancelling with a signal, SIGINT for example)buffer_queue— A buffer queue to save the pushed jobsjobs— The jobs registryname— The name, used in loggerplugins— The plugins to be enabled or disabledto disable a plugin, using-plugin_nameeither all plugin names should be prefixed with '+'/'-' or none of them shouldqueue— The job queuescheduler— The schedulertask— The task of producer and consumers
Parameters
scheduler(str | Type[Scheduler], optional) — The scheduler class or nameplugins(Optional, optional) — The plugins to be enabled or disabledto disable a plugin, using-plugin_nameeither all plugin names should be prefixed with '+'/'-' or none of them shouldworkdir(str | PathType, optional) — The job meta directorysubmission_batch(int | none, optional) — The number of consumers to submit jobs. This allowsmultiple jobs to be submitted in parallel. This is useful when there are many jobs to be submitted and the scheduler has a high latency for each submission. Set this to a smaller number if the scheduler cannot handle too many simultaneous submissions.error_strategy(str, optional) — The strategy when there is error happenednum_retries(int, optional) — Max number of retries when error_strategy is retryforks(int, optional) — Max number of job forks for schedulerscheduler_opts(Optional, optional) — Additional keyword arguments for scheduler
Methods
__del__()— Destructor to warn if stop_feeding was not called</>cancel(sig)— Cancel the producer-consumer task</>feed(cmd,envs)— Put a command into the buffer</>is_feeding()(bool) — Check if the system is in keep_feeding mode.</>run_until_complete(keep_feeding)— Wait until all jobs complete</>stop_feeding()— Stop feeding mode and wait for all jobs to complete.</>
method
__del__()
Destructor to warn if stop_feeding was not called
method
cancel(sig=None)
Cancel the producer-consumer task
self._cancelling will be set to signaled if sig is provided,
otherwise it will be set to True
Parameters
sig(signal.signals | none, optional) — Whether this cancelling is caused by a signal
method
feed(cmd, envs=None)
Put a command into the buffer
Parameters
cmd(CommandType | Job) — The commandenvs(dict, optional) — The environment variables for the job
method
is_feeding()
Check if the system is in keep_feeding mode.
Returns (bool)
True if in keep_feeding mode and waiting for stop_feeding() to be called.
method
stop_feeding()
Stop feeding mode and wait for all jobs to complete.
After calling this method, the producer will exit once the buffer queue is empty, and this method will wait for all jobs to complete. This should be called after all jobs have been submitted when using run_until_complete(keep_feeding=True).
Raises
RuntimeError— If called without first callingrun_until_complete(keep_feeding=True)
method
run_until_complete(keep_feeding=False)
Wait until all jobs complete
Parameters
keep_feeding(bool, optional) — If True, starts running in background and returns immediately,allowing jobs to be added after calling this method. You must call stop_feeding() when done adding jobs, which will wait for all jobs to complete. If False (default), waits for all current jobs to complete immediately.
Examples
Traditional usage:
xqute = Xqute()
await xqute.feed(['echo', '1'])
await xqute.feed(['echo', '2'])
await xqute.run_until_complete()
Keep feeding mode:
xqute = Xqute()
await xqute.feed(['echo', '1'])
await xqute.run_until_complete(keep_feeding=True) # Returns immediately
await xqute.feed(['echo', '2']) # Can add more jobs
await xqute.stop_feeding() # Waits for completion