Plugins
Plugins
Extend Xqute's functionality using the plugin system based on simplug.
Overview
Xqute uses a hook-based plugin system that allows you to:
- Customize job lifecycle behavior
- Add custom logging and monitoring
- Integrate with external services
- Modify job commands dynamically
- Implement custom error handling
Available Hooks
Scheduler Hooks
| Hook | Type | Description |
|---|---|---|
on_init |
sync | Called when scheduler is initialized |
on_shutdown |
sync | Called when scheduler shuts down |
Job Lifecycle Hooks
| Hook | Type | Description |
|---|---|---|
on_job_init |
async | Called when a job is created |
on_job_queued |
async | Called when job is queued |
on_job_submitted |
async | Called after job is submitted |
on_job_started |
async | Called when job starts running |
on_job_polling |
async | Called during status polling |
on_job_killing |
async | Called when job is being killed |
on_job_killed |
async | Called after job is killed |
on_job_failed |
async | Called when job fails |
on_job_succeeded |
async | Called when job succeeds |
Job Command Hooks
| Hook | Type | Description |
|---|---|---|
on_jobcmd_init |
sync | Returns code to prepend to job script |
on_jobcmd_prep |
sync | Returns code before job command |
on_jobcmd_end |
sync | Returns code after job command |
Creating a Plugin
Basic Plugin Structure
from xqute import simplug as pm
@pm.impl
def on_init(scheduler):
"""Called when scheduler is initialized"""
print(f"Scheduler {scheduler.name} initialized")
@pm.impl
async def on_job_submitted(scheduler, job):
"""Called when job is submitted"""
print(f"Job {job.index} submitted with JID: {job._jid}")
Complete Example
Create a file myplugin.py:
"""Custom plugin for Xqute"""
from xqute import simplug as pm
from xqute.defaults import JobStatus
import logging
# Setup logging
logger = logging.getLogger('myplugin')
@pm.impl
def on_init(scheduler):
"""Initialize plugin with scheduler"""
logger.info(f"MyPlugin loaded for {scheduler.name}")
@pm.impl
async def on_job_init(scheduler, job):
"""Called when job is initialized"""
logger.info(f"Job {job.index} initialized: {job.cmd}")
@pm.impl
async def on_job_queued(scheduler, job):
"""Called when job is queued"""
logger.info(f"Job {job.index} queued")
@pm.impl
async def on_job_submitted(scheduler, job):
"""Called after job is submitted"""
jid = await job.get_jid()
logger.info(f"Job {job.index} submitted (JID: {jid})")
@pm.impl
async def on_job_started(scheduler, job):
"""Called when job starts running"""
logger.info(f"Job {job.index} started")
@pm.impl
async def on_job_polling(scheduler, job, counter):
"""Called during status polling"""
if counter % 10 == 0:
status = await job.get_status()
status_name = JobStatus.get_name(status)
logger.debug(f"Job {job.index} polling: {status_name}")
@pm.impl
async def on_job_failed(scheduler, job):
"""Called when job fails"""
logger.error(f"Job {job.index} failed")
@pm.impl
async def on_job_succeeded(scheduler, job):
"""Called when job succeeds"""
logger.info(f"Job {job.index} succeeded")
Using Plugins
Enable Plugin
Import the plugin module before creating Xqute:
import asyncio
from xqute import Xqute
# Import your plugin
import myplugin
async def main():
xqute = Xqute(forks=3)
await xqute.feed(['echo', 'Hello'])
await xqute.run_until_complete()
if __name__ == '__main__':
asyncio.run(main())
Enable Multiple Plugins
# Import multiple plugins
import logging_plugin
import monitoring_plugin
# Enable specific plugins
xqute = Xqute(plugins=['logging_plugin', 'monitoring_plugin'])
Disable Plugin
# Import all plugins
import myplugin
import another_plugin
# Disable specific plugin
xqute = Xqute(plugins=['-myplugin'])
Plugin Examples
1. Email Notifications
"""Send email notifications for job completion"""
import smtplib
from email.mime.text import MIMEText
from xqute import simplug as pm
SMTP_SERVER = 'smtp.example.com'
SMTP_PORT = 587
SMTP_USER = 'user@example.com'
SMTP_PASSWORD = 'password'
def send_email(to, subject, body):
"""Send email notification"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = SMTP_USER
msg['To'] = to
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
@pm.impl
async def on_job_failed(scheduler, job):
"""Send email on job failure"""
stderr = await job.stderr_file.a_read_text()
subject = f"Job {job.index} Failed"
body = f"Job command: {job.cmd}\n\nStderr:\n{stderr}"
send_email('admin@example.com', subject, body)
2. Database Logging
"""Log job information to database"""
import sqlite3
from xqute import simplug as pm
DB_PATH = 'jobs.db'
def get_db():
"""Get database connection"""
conn = sqlite3.connect(DB_PATH)
conn.execute('''
CREATE TABLE IF NOT EXISTS jobs (
index INTEGER PRIMARY KEY,
command TEXT,
status INTEGER,
submitted_at TIMESTAMP,
completed_at TIMESTAMP
)
''')
return conn
@pm.impl
async def on_job_submitted(scheduler, job):
"""Log job submission"""
import datetime
conn = get_db()
conn.execute(
'INSERT INTO jobs (index, command, status, submitted_at) VALUES (?, ?, ?, ?)',
(job.index, str(job.cmd), 2, datetime.datetime.now())
)
conn.commit()
conn.close()
@pm.impl
async def on_job_succeeded(scheduler, job):
"""Update job completion"""
import datetime
conn = get_db()
conn.execute(
'UPDATE jobs SET status = 4, completed_at = ? WHERE index = ?',
(datetime.datetime.now(), job.index)
)
conn.commit()
conn.close()
3. Slack Notifications
"""Send Slack notifications"""
import requests
from xqute import simplug as pm
SLACK_WEBHOOK = 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
def send_slack(message):
"""Send message to Slack"""
requests.post(SLACK_WEBHOOK, json={'text': message})
@pm.impl
async def on_job_failed(scheduler, job):
"""Send Slack notification on failure"""
send_slack(f":x: Job {job.index} failed: {job.cmd}")
@pm.impl
async def on_job_succeeded(scheduler, job):
"""Send Slack notification on success"""
send_slack(f":white_check_mark: Job {job.index} completed")
4. Custom Command Wrapping
"""Wrap job commands with additional setup"""
from xqute import simplug as pm
@pm.impl
def on_jobcmd_init(scheduler, job):
"""Add initialization code to job script"""
return f"""
# Custom initialization
set -e
export CUSTOM_VAR="value"
"""
@pm.impl
def on_jobcmd_prep(scheduler, job):
"""Add preparation code before job command"""
return f"""
# Preparation
cd /tmp/workdir
echo "Preparing job {job.index}"
"""
@pm.impl
def on_jobcmd_end(scheduler, job):
"""Add cleanup code after job command"""
return f"""
# Cleanup
rm -rf /tmp/workdir
echo "Job {job.index} completed"
"""
5. Resource Monitoring
"""Monitor job resource usage"""
import psutil
import time
from xqute import simplug as pm
MONITOR_FILE = 'job_resources.txt'
@pm.impl
async def on_job_started(scheduler, job):
"""Start resource monitoring"""
# In real implementation, start background monitoring
pass
@pm.impl
async def on_job_polling(scheduler, job, counter):
"""Log resource usage"""
# Log CPU, memory usage
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
with open(MONITOR_FILE, 'a') as f:
f.write(f"{job.index},{counter},{cpu_percent},{memory.percent}\n")
6. Conditional Job Submission
"""Conditionally submit jobs based on criteria"""
from xqute import simplug as pm
@pm.impl
async def on_job_submitting(scheduler, job):
"""Decide whether to submit job"""
# Skip jobs with index 5 and 10
if job.index in [5, 10]:
print(f"Skipping job {job.index}")
return False # Cancel submission
# Proceed with submission
return None
Plugin Development Tips
1. Error Handling
Always handle errors gracefully:
@pm.impl
async def on_job_failed(scheduler, job):
try:
# Your code here
pass
except Exception as e:
# Log error but don't crash
from xqute.utils import logger
logger.error(f"Plugin error: {e}")
2. Async vs Sync Hooks
Use async hooks for I/O operations:
# Async: Use for file I/O, network calls
@pm.impl
async def on_job_submitted(scheduler, job):
await job.stdout_file.a_read_text()
# Sync: Use for simple operations
@pm.impl
def on_init(scheduler):
print("Initialized")
3. Hook Order
Multiple plugins can implement the same hook. They execute in import order:
# plugin1.py
@pm.impl
def on_init(scheduler):
print("First")
# plugin2.py
@pm.impl
def on_init(scheduler):
print("Second")
# Output when imported:
# First
# Second
4. Canceling Hooks
Return False from specific hooks to cancel actions:
@pm.impl
async def on_job_submitting(scheduler, job):
# Cancel submission
return False
@pm.impl
async def on_job_killing(scheduler, job):
# Prevent job from being killed
return False
5. Hook Return Values
Some hooks expect return values:
# on_jobcmd_* hooks return code strings
@pm.impl
def on_jobcmd_init(scheduler, job):
return "# My initialization code"
Plugin Distribution
Package as Python Package
Create directory structure:
xqute-myplugin/
├── setup.py
├── pyproject.toml
├── README.md
└── xqute_myplugin/
└── __init__.py
xqute_myplugin/__init__.py:
"""Xqute plugin for custom functionality"""
from xqute import simplug as pm
@pm.impl
def on_init(scheduler):
print("MyPlugin loaded")
setup.py:
from setuptools import setup, find_packages
setup(
name='xqute-myplugin',
version='0.1.0',
packages=find_packages(),
install_requires=['xqute'],
entry_points={
'xqute.plugins': [
'myplugin = xqute_myplugin',
],
},
)
Install Plugin
pip install xqute-myplugin
# Use plugin
from xqute import Xqute
xqute = Xqute(plugins=['myplugin'])
Testing Plugins
Unit Testing
import pytest
from xqute import Xqute, simplug as pm
# Define test plugin
@pm.impl
def on_init(scheduler):
scheduler.test_flag = True
async def test_plugin():
xqute = Xqute()
assert hasattr(xqute.scheduler, 'test_flag')
assert xqute.scheduler.test_flag is True
Integration Testing
import pytest
import asyncio
async def test_plugin_integration():
# Import plugin
import myplugin
xqute = Xqute(forks=1)
await xqute.feed(['echo', 'test'])
await xqute.run_until_complete()
# Verify plugin behavior
# ... assertions
Best Practices
- Error Handling: Always catch exceptions in hooks
- Logging: Use
xqute.utils.loggerfor consistent logging - Performance: Avoid blocking operations in async hooks
- Idempotency: Hooks should be safe to call multiple times
- Documentation: Document your hooks and behavior
- Testing: Write tests for your plugin functionality
- Compatibility: Test with multiple Xqute versions
Next Steps
- Advanced Usage - Advanced patterns and customization
- User Guide - Comprehensive usage guide
- API Reference - Complete API documentation