User Guide
User Guide
Comprehensive guide for using Xqute effectively.
Table of Contents
- Core Concepts
- Initialization
- Job Management
- Execution Modes
- Error Handling
- Working Directories
- Environment Variables
- Job Output
- Monitoring
- Best Practices
Core Concepts
Xqute
The main class that manages jobs and coordinates their execution. It acts as both a producer and consumer pattern:
- Producer: Feeds jobs into the buffer queue
- Consumer: Submits jobs to the scheduler
- Monitor: Polls job status and handles completion
Job
Represents a single unit of work with: - Command to execute - Metadata directory for logs and status - Environment variables - Status tracking
Scheduler
Abstracts job execution for different backends. Each scheduler implementation handles: - Job submission - Status monitoring - Job cancellation
Initialization
Basic Initialization
from xqute import Xqute
# Default: local scheduler, 1 fork
xqute = Xqute()
# Specify concurrency
xqute = Xqute(forks=5)
Full Initialization Options
from xqute import Xqute
xqute = Xqute(
scheduler='local', # Scheduler type or class
plugins=None, # Plugin list (e.g., ['myplugin'])
workdir='./.xqute', # Job metadata directory
submission_batch=5, # Parallel job submissions
error_strategy='retry', # Error handling strategy
num_retries=3, # Max retry attempts
forks=10, # Max concurrent jobs
scheduler_opts={}, # Scheduler-specific options
jobname_prefix='myjob', # Job name prefix
recheck_interval=10, # Recheck running jobs every N polls
)
Parameter Details
| Parameter | Type | Default | Description |
|---|---|---|---|
scheduler |
str/Scheduler | 'local' |
Scheduler backend |
plugins |
list | None |
Plugins to enable/disable |
workdir |
str/path | './.xqute' |
Job metadata directory |
submission_batch |
int | None |
Parallel job submissions |
error_strategy |
str | 'retry' |
Error strategy ('retry' or 'halt') |
num_retries |
int | 3 |
Max retry attempts |
forks |
int | 1 |
Max concurrent jobs |
scheduler_opts |
dict | {} |
Scheduler-specific options |
jobname_prefix |
str | scheduler name | Job name prefix |
recheck_interval |
int | 10 |
Recheck interval (in polls) |
Job Management
Feeding Jobs
Simple Commands
# String command
await xqute.feed('echo "Hello World"')
# List command
await xqute.feed(['echo', 'Hello World'])
With Environment Variables
await xqute.feed(
['bash', '-c', 'echo $MY_VAR'],
envs={'MY_VAR': 'value'}
)
Multiple Jobs
for i in range(100):
await xqute.feed(['python', 'process.py', str(i)])
Job Objects
Create jobs directly:
from xqute import Job
job = Job(
index=0,
cmd=['echo', 'test'],
workdir='./.xqute',
error_retry=True,
num_retries=3,
)
await xqute.feed(job)
Job Metadata
Each job has a metadata directory containing:
.xqute/
├── 0/ # Job index
│ ├── job.stdout # Standard output
│ ├── job.stderr # Standard error
│ ├── job.status # Job status
│ ├── job.rc # Return code
│ ├── job.jid # Scheduler job ID
│ ├── job.retry/ # Retry history (if retried)
│ └── job.wrapped.local # Wrapped script
└── 1/
└── ...
Execution Modes
Traditional Mode (Default)
Add all jobs first, then wait for completion:
async def main():
xqute = Xqute(forks=5)
# Add all jobs
for i in range(100):
await xqute.feed(['echo', f'Job {i}'])
# Wait for all to complete
await xqute.run_until_complete()
Daemon Mode (Keep Feeding)
Add jobs while running:
async def main():
xqute = Xqute(forks=5)
# Start in background
await xqute.run_until_complete(keep_feeding=True)
# Add jobs dynamically
for i in range(100):
await xqute.feed(['echo', f'Job {i}'])
# Signal completion and wait
await xqute.stop_feeding()
Checking Feeding Status
if xqute.is_feeding():
await xqute.stop_feeding()
Error Handling
Error Strategies
Retry Strategy (Default)
Automatically retry failed jobs:
xqute = Xqute(
error_strategy='retry',
num_retries=3,
)
When a job fails:
1. Job metadata is backed up to .xqute/<index>/job.retry/<trial>
2. Job status is reset to QUEUED
3. Job is resubmitted
Halt Strategy
Stop execution on first failure:
xqute = Xqute(
error_strategy='halt',
)
When a job fails: 1. All running jobs are killed 2. No more jobs are submitted 3. Execution terminates with SIGTERM
Per-Job Error Handling
Override error strategy per job:
from xqute import Job
job = Job(
index=0,
cmd=['risky-command'],
workdir='./.xqute',
error_retry=False, # Disable retry for this job
)
Working Directories
Local Working Directory
xqute = Xqute(workdir='./myjobs')
Cloud Working Directory
xqute = Xqute(
workdir='gs://my-bucket/jobs',
scheduler_opts={
'mounted_workdir': '/mnt/jobs', # Mount point
}
)
Mounted Paths
For cloud storage, specify mount point:
xqute = Xqute(
workdir='gs://bucket/jobs',
scheduler_opts={
'mounted_workdir': '/mnt/gs',
}
)
Environment Variables
Built-in Variables
Xqute automatically sets these environment variables for each job:
| Variable | Description |
|---|---|
XQUTE_JOB_INDEX |
Job index number |
XQUTE_METADIR |
Main metadata directory |
XQUTE_JOB_METADIR |
Job-specific metadata directory |
Custom Variables
await xqute.feed(
['bash', '-c', 'echo $MY_CUSTOM_VAR'],
envs={
'MY_CUSTOM_VAR': 'value',
'ANOTHER_VAR': 'another value',
}
)
Accessing in Jobs
await xqute.feed(
['python', '-c', '''
import os
print(f"Job index: {os.getenv('XQUTE_JOB_INDEX')}")
print(f"Custom: {os.getenv('MY_VAR')}")
'''],
envs={'MY_VAR': 'custom'}
)
Job Output
Reading Output After Completion
async def main():
xqute = Xqute(forks=3)
await xqute.feed(['echo', 'Hello'])
await xqute.run_until_complete()
# Read output
job = xqute.jobs[0]
stdout = await job.stdout_file.a_read_text()
stderr = await job.stderr_file.a_read_text()
rc = await job.get_rc()
print(f"Output: {stdout}")
print(f"Errors: {stderr}")
print(f"Return code: {rc}")
Stream Output During Execution
Use plugins to stream output (see Plugins).
Output File Locations
job = xqute.jobs[0]
print(f"stdout: {job.stdout_file}")
print(f"stderr: {job.stderr_file}")
print(f"status: {job.status_file}")
print(f"rc: {job.rc_file}")
Monitoring
Job Status
from xqute.defaults import JobStatus
status = await job.get_status()
status_name = JobStatus.get_name(status)
print(f"Job status: {status_name}")
Status Values
| Status | Value | Description |
|---|---|---|
INIT |
0 | Job initialized |
QUEUED |
1 | Job queued for submission |
SUBMITTED |
2 | Job submitted to scheduler |
RUNNING |
3 | Job is running |
FINISHED |
4 | Job completed successfully |
FAILED |
5 | Job failed |
KILLING |
6 | Job is being killed |
Checking Multiple Jobs
import asyncio
async def check_all_jobs():
statuses = await asyncio.gather(
*[job.get_status(refresh=True) for job in xqute.jobs]
)
for job, status in zip(xqute.jobs, statuses):
status_name = JobStatus.get_name(status)
print(f"Job {job.index}: {status_name}")
Completion Tracking
# Wait for specific job to complete
from xqute.defaults import JobStatus
while True:
status = await xqute.jobs[0].get_status(refresh=True)
if status in (JobStatus.FINISHED, JobStatus.FAILED):
break
await asyncio.sleep(1)
Best Practices
1. Choose Appropriate Concurrency
# Local machine: match CPU cores
import os
xqute = Xqute(forks=os.cpu_count())
# Slurm: match allocated resources
xqute = Xqute(forks=100)
2. Use Submission Batching
# For high-latency schedulers
xqute = Xqute(
scheduler='slurm',
submission_batch=10, # Submit 10 jobs at once
)
3. Monitor Resource Usage
# Check running jobs before adding more
if len([j for j in xqute.jobs if j._status == JobStatus.RUNNING]) < 100:
await xqute.feed(['next-job'])
4. Handle Large Output
# Write output to files instead of stdout
await xqute.feed([
'python', 'process.py',
'--output', f'{job.metadir}/output.txt'
])
5. Use Descriptive Job Names
xqute = Xqute(
jobname_prefix='myproject',
scheduler_opts={'job_name': 'batch1'}
)
6. Cleanup Old Jobs
# Clean up old metadata
import shutil
if os.path.exists('./.xqute'):
shutil.rmtree('./.xqute')
7. Logging Configuration
from xqute.utils import logger
import logging
logger.setLevel(logging.DEBUG)
8. Error Recovery
try:
await xqute.run_until_complete()
except Exception as e:
logger.error(f"Failed: {e}")
# Check failed jobs
failed = [
job for job in xqute.jobs
if await job.get_status() == JobStatus.FAILED
]
logger.info(f"Failed jobs: {len(failed)}")
9. Graceful Shutdown
# Signal handler
async def shutdown_handler():
logger.info("Shutting down...")
await xqute.stop_feeding()
# Handle Ctrl+C
import signal
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown_handler()))
10. Cloud Storage Best Practices
# Use local cache for cloud paths
xqute = Xqute(
workdir='gs://bucket/jobs',
scheduler_opts={
'mounted_workdir': '/mnt/gs',
'cache_dir': '/tmp/cache', # Local cache
}
)
Troubleshooting
Jobs Stuck in SUBMITTED Status
Check if job failed before running:
# Check scheduler logs
qstat -j <job_id> # SGE
squeue -j <job_id> # Slurm
High Memory Usage
Reduce concurrency:
xqute = Xqute(forks=5) # Reduce from default
Slow Job Submission
Increase submission batch:
xqute = Xqute(
scheduler='slurm',
submission_batch=20,
)
Next Steps
- Schedulers - Learn about specific scheduler backends
- Plugins - Extend Xqute functionality
- Advanced Usage - Advanced patterns and customization
- API Reference - Complete API documentation