Skip to content

Advanced Usage

Advanced Usage

Advanced patterns and customization for power users.

Table of Contents

Custom Schedulers

Creating a Custom Scheduler

Implement the Scheduler abstract class:

from xqute import Scheduler

class CustomScheduler(Scheduler):
    """Custom scheduler implementation"""
    name = 'custom'

    async def submit_job(self, job):
        """Submit a job and return its unique ID"""
        # Your submission logic here
        job_id = self._submit_to_custom_system(job)
        return job_id

    async def kill_job(self, job):
        """Kill a job"""
        jid = await job.get_jid()
        # Your kill logic here
        self._kill_in_custom_system(jid)

    async def job_is_running(self, job):
        """Check if a job is running"""
        jid = await job.get_jid()
        # Your status check logic here
        return self._check_status(jid) == 'running'

Using Custom Scheduler

from xqute import Xqute

xqute = Xqute(
    scheduler=CustomScheduler,
    forks=10,
    scheduler_opts={
        'custom_param': 'value',
    }
)

await xqute.feed(['echo', 'Hello'])
await xqute.run_until_complete()

Advanced Custom Scheduler Example

from xqute import Scheduler
from typing import Dict, Any
import subprocess

class MyClusterScheduler(Scheduler):
    """Scheduler for custom cluster"""

    name = 'mycluster'

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.jobs: Dict[str, Any] = {}

    async def submit_job(self, job):
        """Submit job to custom cluster"""
        cmd = self._build_submit_command(job)
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()

        if proc.returncode != 0:
            raise RuntimeError(f"Submission failed: {stderr.decode()}")

        job_id = stdout.decode().strip()
        self.jobs[job_id] = {
            'job': job,
            'status': 'submitted',
        }
        return job_id

    async def kill_job(self, job):
        """Kill job"""
        jid = await job.get_jid()
        cmd = ['mycluster-kill', jid]
        await asyncio.create_subprocess_exec(*cmd)
        if jid in self.jobs:
            del self.jobs[jid]

    async def job_is_running(self, job):
        """Check if job is running"""
        jid = await job.get_jid()
        if jid not in self.jobs:
            return False

        cmd = ['mycluster-status', jid]
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
        )
        stdout, _ = await proc.communicate()
        status = stdout.decode().strip()

        self.jobs[jid]['status'] = status
        return status == 'running'

    def _build_submit_command(self, job):
        """Build submission command"""
        return [
            'mycluster-submit',
            '--job-name', f'{self.jobname_prefix}-{job.index}',
            '--output', str(job.stdout_file),
            '--error', str(job.stderr_file),
        ]

Job Dependencies

Sequential Dependencies

Run jobs in order using daemon mode:

import asyncio
from xqute import Xqute

async def run_with_dependencies():
    xqute = Xqute(forks=1)  # Single fork for sequential execution

    await xqute.run_until_complete(keep_feeding=True)

    # Job 1
    await xqute.feed(['prepare_data.sh'])
    await xqute.run_until_complete()

    # Job 2 (depends on Job 1)
    await xqute.feed(['process_data.sh'])
    await xqute.run_until_complete()

    # Job 3 (depends on Job 2)
    await xqute.feed(['finalize.sh'])

    await xqute.stop_feeding()

Dependency Tracking with Plugins

from xqute import simplug as pm

dependencies = {}  # job_index -> [parent_indices]

@pm.impl
async def on_job_queued(scheduler, job):
    """Check if dependencies are satisfied"""
    if job.index not in dependencies:
        return

    deps = dependencies[job.index]
    for dep_idx in deps:
        dep_job = scheduler.xqute.jobs[dep_idx]
        status = await dep_job.get_status()
        if status != 4:  # Not FINISHED
            # Skip for now, will be retried later
            return False

Job Arrays

Simple Job Array

import asyncio
from xqute import Xqute

async def main():
    xqute = Xqute(forks=10)

    # Submit job array
    for i in range(100):
        await xqute.feed(['process.sh', str(i)])

    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())

Job Array with Slurm

from xqute import Xqute

xqute = Xqute(
    scheduler='slurm',
    forks=100,
    scheduler_opts={
        'partition': 'compute',
        'array': '1-100',  # Job array
    }
)

# Single submission for array
await xqute.feed(['process.sh', '$SLURM_ARRAY_TASK_ID'])
await xqute.run_until_complete()

Progress Tracking

Simple Progress Bar

import asyncio
from xqute import Xqute
from xqute.defaults import JobStatus
from tqdm import tqdm

async def track_progress():
    xqute = Xqute(forks=5)

    # Submit jobs
    for i in range(100):
        await xqute.feed(['sleep', '1'])

    # Start progress bar
    progress = tqdm(total=100, desc="Processing")

    async def update_progress():
        while True:
            completed = sum(
                1 for job in xqute.jobs
                if await job.get_status() in (JobStatus.FINISHED, JobStatus.FAILED)
            )
            progress.n = completed
            progress.refresh()

            if completed == 100:
                progress.close()
                break
            await asyncio.sleep(1)

    # Run jobs with progress tracking
    import asyncio
    await asyncio.gather(
        xqute.run_until_complete(),
        update_progress(),
    )

Detailed Progress Tracking

import asyncio
from xqute import Xqute
from xqute.defaults import JobStatus
import time

class ProgressTracker:
    def __init__(self, xqute):
        self.xqute = xqute
        self.start_time = time.time()

    async def track(self):
        while True:
            status_counts = {}
            for status in JobStatus:
                count = sum(
                    1 for job in self.xqute.jobs
                    if await job.get_status() == status
                )
                status_counts[status] = count

            elapsed = time.time() - self.start_time
            total = len(self.xqute.jobs)
            completed = status_counts.get(JobStatus.FINISHED, 0)
            failed = status_counts.get(JobStatus.FAILED, 0)

            print(f"[{elapsed:.1f}s] "
                  f"Completed: {completed}/{total} "
                  f"Failed: {failed} "
                  f"Running: {status_counts.get(JobStatus.RUNNING, 0)}")

            if completed + failed == total:
                break

            await asyncio.sleep(2)

async def main():
    xqute = Xqute(forks=5)

    for i in range(50):
        await xqute.feed(['sleep', str(i % 5)])

    tracker = ProgressTracker(xqute)
    await asyncio.gather(
        xqute.run_until_complete(),
        tracker.track(),
    )

Streaming Output

Stream Output to Console

from xqute import simplug as pm
import asyncio

@pm.impl
async def on_job_succeeded(scheduler, job):
    """Stream output after job completes"""
    stdout = await job.stdout_file.a_read_text()
    if stdout:
        print(f"Job {job.index} output:\n{stdout}")

Real-time Output Streaming

from xqute import simplug as pm
import aiofiles

@pm.impl
async def on_job_polling(scheduler, job, counter):
    """Stream output during polling"""
    try:
        async with aiofiles.open(job.stdout_file, 'r') as f:
            # Seek to end
            await f.seek(0, 2)
            # Read new lines
            while True:
                line = await f.readline()
                if not line:
                    break
                print(f"[Job {job.index}] {line.strip()}")
    except FileNotFoundError:
        pass

Custom Error Handling

Conditional Retry Logic

from xqute import simplug as pm

@pm.impl
async def on_job_failed(scheduler, job):
    """Custom retry logic based on error type"""
    stderr = await job.stderr_file.a_read_text()

    # Don't retry on certain errors
    if 'SyntaxError' in stderr:
        from xqute.utils import logger
        logger.warning(f"Syntax error in job {job.index}, not retrying")
        # Disable retry for this job
        job._error_retry = False
        job._num_retries = 0

Error Aggregation

from xqute import simplug as pm

error_summary = {
    'syntax_errors': 0,
    'runtime_errors': 0,
    'timeout_errors': 0,
}

@pm.impl
async def on_job_failed(scheduler, job):
    """Aggregate error types"""
    stderr = await job.stderr_file.a_read_text()

    if 'SyntaxError' in stderr:
        error_summary['syntax_errors'] += 1
    elif 'TimeoutError' in stderr:
        error_summary['timeout_errors'] += 1
    else:
        error_summary['runtime_errors'] += 1

@pm.impl
def on_shutdown(scheduler, sig):
    """Print error summary"""
    print("Error Summary:")
    for error_type, count in error_summary.items():
        print(f"  {error_type}: {count}")

Job Templates

Template with Environment Variables

from xqute import Xqute

def create_template_job(index, params):
    """Create job from template"""
    return {
        'cmd': ['python', 'process.py'],
        'envs': {
            'INPUT_FILE': params['input'],
            'OUTPUT_DIR': params['output'],
            'PARAM_A': str(params.get('param_a', 1)),
        }
    }

async def main():
    xqute = Xqute(forks=5)

    # Create jobs from templates
    jobs = [
        {
            'input': f'/data/input_{i}.txt',
            'output': f'/data/output_{i}/',
            'param_a': i,
        }
        for i in range(10)
    ]

    for i, params in enumerate(jobs):
        job = create_template_job(i, params)
        await xqute.feed(job['cmd'], envs=job['envs'])

    await xqute.run_until_complete()

Template with Command Generation

def generate_job_commands(config):
    """Generate job commands from configuration"""
    commands = []

    for dataset in config['datasets']:
        for model in config['models']:
            cmd = [
                'python', 'train.py',
                '--dataset', dataset,
                '--model', model,
                '--epochs', str(config.get('epochs', 10)),
                '--batch-size', str(config.get('batch_size', 32)),
            ]
            commands.append(cmd)

    return commands

# Usage
config = {
    'datasets': ['imagenet', 'cifar10'],
    'models': ['resnet50', 'vgg16'],
    'epochs': 50,
    'batch_size': 64,
}

xqute = Xqute(forks=4)

for cmd in generate_job_commands(config):
    await xqute.feed(cmd)

await xqute.run_until_complete()

Batch Processing

Process Files in Batches

import asyncio
from xqute import Xqute
from pathlib import Path

async def process_files(file_list, batch_size=10):
    """Process files in batches"""
    xqute = Xqute(forks=5)

    for i in range(0, len(file_list), batch_size):
        batch = file_list[i:i+batch_size]

        for file_path in batch:
            await xqute.feed(['python', 'process.py', file_path])

        # Wait for batch to complete
        await xqute.run_until_complete()
        xqute = Xqute(forks=5)  # Create new Xqute for next batch

# Usage
files = list(Path('/data').glob('*.txt'))
await process_files(files, batch_size=10)

Batch Processing with Results

import asyncio
from xqute import Xqute

async def process_with_results(items, process_func):
    """Process items and collect results"""
    xqute = Xqute(forks=5)

    # Submit all jobs
    for item in items:
        await xqute.feed(['python', process_func, item])

    # Wait for completion
    await xqute.run_until_complete()

    # Collect results
    results = []
    for job in xqute.jobs:
        stdout = await job.stdout_file.a_read_text()
        results.append((job.index, stdout))

    return results

# Usage
items = ['item1', 'item2', 'item3']
results = await process_with_results(items, 'processor.py')

Integration with Other Tools

Integration with Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
import asyncio
from xqute import Xqute
from datetime import datetime, timedelta

def run_xqute_jobs(**context):
    """Run Xqute jobs in Airflow task"""

    async def process():
        xqute = Xqute(
            scheduler='slurm',
            forks=10,
            scheduler_opts={
                'partition': 'airflow',
            }
        )

        # Get files from Airflow context
        files = context['params']['files']

        for file_path in files:
            await xqute.feed(['python', 'process.py', file_path])

        await xqute.run_until_complete()

    asyncio.run(process())

with DAG(
    'xqute_pipeline',
    default_args={
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'retries': 1,
    },
    schedule_interval=timedelta(days=1),
) as dag:

    run_jobs = PythonOperator(
        task_id='run_xqute_jobs',
        python_callable=run_xqute_jobs,
        params={
            'files': ['/data/file1.txt', '/data/file2.txt'],
        },
    )

Integration with Dask

import dask
from xqute import Xqute
import asyncio

@dask.delayed
def run_xqute_task(task_id, command):
    """Run Xqute job as Dask task"""

    async def process():
        xqute = Xqute(forks=1)
        await xqute.feed(command)
        await xqute.run_until_complete()

    asyncio.run(process())

# Create Dask workflow
tasks = [
    run_xqute_task(i, ['python', 'process.py', str(i)])
    for i in range(100)
]

# Execute with Dask
results = dask.compute(*tasks)

Performance Optimization

Optimize Concurrency

import os
import psutil

def calculate_optimal_forks():
    """Calculate optimal concurrency based on resources"""
    cpu_count = os.cpu_count()
    mem_gb = psutil.virtual_memory().total / (1024**3)

    # Adjust based on job type
    if mem_gb > 64:
        return cpu_count * 2
    elif mem_gb > 32:
        return cpu_count
    else:
        return max(1, cpu_count // 2)

xqute = Xqute(forks=calculate_optimal_forks())

Batch Submissions

# Reduce scheduler overhead by batching submissions
xqute = Xqute(
    scheduler='slurm',
    forks=1000,
    submission_batch=50,  # Submit 50 jobs at once
)

Reduce Polling Frequency

# Poll less frequently for long-running jobs
xqute = Xqute(
    scheduler='slurm',
    recheck_interval=50,  # Check every 50 polls instead of 10
)

Optimize Working Directory

# Use local directory instead of cloud for speed
xqute = Xqute(
    workdir='./.xqute',  # Local, not 'gs://bucket/jobs'
)

# Or use mounted path
xqute = Xqute(
    workdir='gs://bucket/jobs',
    scheduler_opts={
        'mounted_workdir': '/mnt/gs',  # Faster access
    },
)

Memory Management

from xqute import simplug as pm

@pm.impl
async def on_job_succeeded(scheduler, job):
    """Clean up large output files to save space"""
    # Remove stdout if it's too large
    if await job.stdout_file.a_size() > 10_000_000:  # 10MB
        await job.stdout_file.a_unlink()

    # Remove stderr
    await job.stderr_file.a_unlink()

Next Steps