"""The scheduler to schedule jobs"""
from __future__ import annotations
import os
import shlex
import signal
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, List, Type
from yunpath import CloudPath
from diot import Diot # type: ignore
from .defaults import (
JobStatus,
JobErrorStrategy,
JOBCMD_WRAPPER_LANG,
JOBCMD_WRAPPER_TEMPLATE,
DEFAULT_ERROR_STRATEGY,
DEFAULT_NUM_RETRIES,
get_jobcmd_wrapper_init,
)
from .utils import logger, CommandType
from .path import SpecPath
from .job import Job
from .plugin import plugin
class Scheduler(ABC):DOCS
"""The abstract class for scheduler
Attributes:
name: The name of the scheduler
jobcmd_wrapper_init: The init script for the job command wrapper
Args:
workdir: The working directory
forks: Max number of job forks
error_strategy: The strategy when there is error happened
num_retries: Max number of retries when error_strategy is retry
prescript: The prescript to run before the job command
It is a piece of script that inserted into the wrapper script, running
on the scheduler system.
postscript: The postscript to run when job finished
It is a piece of script that inserted into the wrapper script, running
on the scheduler system.
jobname_prefix: The prefix for the job name
recheck_interval: The interval to recheck the job status.
Default is every 600 polls (each takes about 0.1 seconds).
cwd: The working directory for the job command wrapper
**kwargs: Other arguments for the scheduler
"""
__slots__ = (
"config",
"forks",
"workdir",
"error_strategy",
"num_retries",
"prescript",
"postscript",
"jobname_prefix",
"recheck_interval",
"cwd",
)
# The name of the scheduler
name: str
# Should we remove the jid file after job done
# This is useful for local scheduler and alike. The biggest reason for this is
# that the pid can be reused by other processes, which might cause the
# scheduler to think the job is still running.
# But for schedulers like google batch jobs, the jid file is used to delete
# the job if we want to retry the job.
remove_jid_after_done: bool = True
job_class: Type[Job] = Job
def __init__(
self,
workdir: str | Path | CloudPath,
forks: int = 1,
error_strategy: str = DEFAULT_ERROR_STRATEGY,
num_retries: int = DEFAULT_NUM_RETRIES,
prescript: str = "",
postscript: str = "",
jobname_prefix: str | None = None,
recheck_interval: int = 600,
cwd: str | Path = None,
**kwargs,
):
self.forks = forks
mounted_workdir = kwargs.pop("mounted_workdir", None)
self.workdir = SpecPath(workdir, mounted=mounted_workdir)
self.error_strategy = error_strategy
self.num_retries = num_retries
self.prescript = prescript
self.postscript = postscript
self.jobname_prefix = jobname_prefix or self.name
self.recheck_interval = recheck_interval
self.cwd = None if cwd is None else str(cwd)
self.config = Diot(**kwargs)
def create_job(DOCS
self,
index: int,
cmd: CommandType,
envs: dict[str, Any] | None = None,
) -> Job:
"""Create a job
Args:
index: The index of the job
cmd: The command of the job
Returns:
The job
"""
return self.job_class(
index=index,
cmd=cmd,
workdir=self.workdir,
error_retry=self.error_strategy == JobErrorStrategy.RETRY,
num_retries=self.num_retries,
envs=envs,
)
async def submit_job_and_update_status(self, job: Job):DOCS
"""Submit and update the status
1. Check if the job is already submitted or running
2. If not, run the hook
3. If the hook is not cancelled, clean the job
4. Submit the job, raising an exception if it fails
5. If the job is submitted successfully, update the status
6. If the job fails to submit, update the status and write stderr to
the job file
Args:
job: The job
"""
if await self.job_is_submitted_or_running(job):
logger.warning(
"/Scheduler-%s Skip submitting, "
"job %r is already submitted or running.",
self.name,
job,
)
return
exception: Exception | None = None
try:
if await plugin.hooks.on_job_submitting(self, job) is False:
logger.info(
"/Scheduler-%s Job %s submission cancelled by hook.",
self.name,
job.index,
)
return
logger.debug(
"/Scheduler-%s Cleaning job %s before submission",
self.name,
job.index,
)
job.clean()
try:
# raise the exception immediately
# it somehow cannot be catched immediately
logger.debug(
"/Scheduler-%s Submitting job %s ...",
self.name,
job.index,
)
job.jid = await self.submit_job(job)
except Exception as exc:
exception = RuntimeError(f"Failed to submit job: {exc}")
exception.__traceback__ = exc.__traceback__
else:
logger.info(
"/Scheduler-%s Job %s submitted (jid: %s, wrapped: %s)",
self.name,
job.index,
job.jid,
self.wrapped_job_script(job),
)
job.status = JobStatus.SUBMITTED
await plugin.hooks.on_job_submitted(self, job)
except Exception as exc: # pragma: no cover
exception = exc
if exception is not None:
from traceback import format_exception
job.stderr_file.write_text(
"".join(
format_exception(
type(exception),
exception,
exception.__traceback__,
)
),
)
job.rc_file.write_text("-2")
job.status = JobStatus.FAILED
await plugin.hooks.on_job_failed(self, job)
async def retry_job(self, job: Job):DOCS
"""Retry a job
Args:
job: The job
"""
job.jid = ""
job.clean(retry=True)
job.trial_count += 1
logger.warning(
"/Scheduler-%s Retrying (#%s) job: %r",
self.name,
job.trial_count,
job,
)
await self.submit_job_and_update_status(job)
async def kill_job_and_update_status(self, job: Job):DOCS
"""Kill a job and update its status
Args:
job: The job
"""
job.status = JobStatus.KILLING
ret = await plugin.hooks.on_job_killing(self, job)
if ret is False: # pragma: no cover
logger.info(
"/Scheduler-%s Job %s killing cancelled by hook.",
self.name,
job.index,
)
return
logger.warning("/Scheduler-%s Killing job %s ...", self.name, job.index)
await self.kill_job(job)
try:
# in case the jid file is removed by the wrapped script
logger.debug(
"/Scheduler-%s Removing jid file %s ...",
self.name,
job.jid_file,
)
job.jid_file.unlink(missing_ok=True)
except Exception: # pragma: no cover
# missing_ok is not working for some cloud paths
# FileNotFoundError, google.api_core.exceptions.NotFound
pass
job.status = JobStatus.FINISHED
logger.info(
"/Scheduler-%s Job %s killed, calling hook ...",
self.name,
job.index,
)
await plugin.hooks.on_job_killed(self, job)
async def polling_jobs(DOCS
self,
jobs: List[Job],
on: str,
polling_counter: int,
) -> bool:
"""Check if all jobs are done or new jobs can submit
Args:
jobs: The list of jobs
on: query on status: `submittable` or `all_done`
polling_counter: The polling counter, used to limit the number of polls or
skip some polls if the scheduler is busy.
Returns:
True if yes otherwise False.
"""
n_running = 0
ret = True
logger.debug(
"/Scheduler-%s Polling jobs (%s) (%s) ...",
self.name,
on,
polling_counter,
)
for job in jobs:
status = job.status
if on == "submittable" and status in (
JobStatus.QUEUED,
JobStatus.SUBMITTED,
JobStatus.RUNNING,
JobStatus.KILLING,
):
logger.debug(
"/Scheduler-%s Job %s is %s, incrementing n_running to (%s)...",
self.name,
job.index,
JobStatus.get_name(status),
n_running + 1,
)
n_running += 1
if job.prev_status != status:
if status in (JobStatus.FAILED, JobStatus.RETRYING):
logger.debug(
"/Scheduler-%s Job %s changed status: %s -> %s",
self.name,
job.index,
JobStatus.get_name(job.prev_status),
JobStatus.get_name(status),
)
if job.prev_status != JobStatus.RUNNING:
logger.debug(
"/Scheduler-%s Job %s was not running before failure, "
"running on_job_started hook to ensure lifecycle ...",
self.name,
job.index,
)
await plugin.hooks.on_job_started(self, job)
logger.debug(
"/Scheduler-%s Job %s calling on_job_failed hook ...",
self.name,
job.index,
)
await plugin.hooks.on_job_failed(self, job)
if self.remove_jid_after_done:
# We are also doing this in the wrapped script
# But for the could jid file, it is not 100% sure
# the file is removed, because it is still held by
# the GSPath object and in some cases when it is
# recycled, the file is recreated on the cloud.
try:
job.jid_file.unlink(missing_ok=True)
except Exception: # pragma: no cover
# missing_ok is not working for some cloud paths
# FileNotFoundError, google.api_core.exceptions.NotFound
pass
elif status == JobStatus.FINISHED:
logger.debug(
"/Scheduler-%s Job %s changed status: %s -> %s",
self.name,
job.index,
JobStatus.get_name(job.prev_status),
JobStatus.get_name(status),
)
if job.prev_status != JobStatus.RUNNING:
logger.debug(
"/Scheduler-%s Job %s was not running before finishing, "
"running on_job_started hook to ensure lifecycle ...",
self.name,
job.index,
)
await plugin.hooks.on_job_started(self, job)
logger.debug(
"/Scheduler-%s Job %s calling on_job_succeeded hook ...",
self.name,
job.index,
)
await plugin.hooks.on_job_succeeded(self, job)
if self.remove_jid_after_done:
try:
job.jid_file.unlink(missing_ok=True)
except Exception: # pragma: no cover
# FileNotFoundError, google.api_core.exceptions.NotFound
pass
elif status == JobStatus.RUNNING:
logger.debug(
"/Scheduler-%s Job %s changed status: %s -> %s",
self.name,
job.index,
JobStatus.get_name(job.prev_status),
JobStatus.get_name(status),
)
logger.debug(
"/Scheduler-%s Job %s calling on_job_started hook ...",
self.name,
job.index,
)
await plugin.hooks.on_job_started(self, job)
elif status == JobStatus.SUBMITTED: # pragma: no cover
# Check if the job fails before running
if await self.job_fails_before_running(job):
logger.warning(
"/Scheduler-%s Job %s seems to fail before running, "
"check your scheduler logs if necessary.",
self.name,
job.index,
)
job.status = JobStatus.FAILED
job.rc_file.write_text("-3")
with job.stderr_file.open("a") as f:
f.write(
"\nError: job seems to fail before running.\n"
"Check your scheduler logs if necessary.\n",
)
await plugin.hooks.on_job_failed(self, job)
if self.error_strategy == JobErrorStrategy.HALT:
logger.error(
"/Scheduler-%s Pipeline will halt since job failed: %r",
self.name,
job,
)
os.kill(os.getpid(), signal.SIGTERM)
# job.status = JobStatus.FINISHED
break
elif status == JobStatus.RUNNING:
logger.debug(
"/Scheduler-%s Job %s is running, calling polling hook ...",
self.name,
job.index,
)
# Call the polling hook
await plugin.hooks.on_job_polling(self, job, polling_counter)
# Let's make sure the job is really running
# For example, a node can be preempted by the cloud and
# the job status and rc will not be updated
# If we have an rc file, that means the job is done
# and we can skip the polling
if (
not job.rc_file.is_file()
and (polling_counter + 1) % self.recheck_interval == 0
and not await self.job_is_running(job)
): # pragma: no cover
logger.warning(
"/Scheduler-%s Job %s is not running in the scheduler, "
"but its status is still RUNNING, setting it to FAILED",
self.name,
job.index,
)
job.status = JobStatus.FAILED
job.rc_file.write_text("-3")
with job.stderr_file.open("a") as f:
f.write(
"\nError: job is not running in the scheduler, "
"but its status is still RUNNING.\n",
"It is likely that the resource is preempted.\n",
)
await plugin.hooks.on_job_failed(self, job)
if self.error_strategy == JobErrorStrategy.HALT:
logger.error(
"/Scheduler-%s Pipeline will halt since job failed: %r",
self.name,
job,
)
os.kill(os.getpid(), signal.SIGTERM)
# job.status = JobStatus.FINISHED
break
if (
self.error_strategy == JobErrorStrategy.HALT
and status == JobStatus.FAILED
):
logger.error(
"/Scheduler-%s Pipeline will halt since job failed: %r",
self.name,
job,
)
os.kill(os.getpid(), signal.SIGTERM)
# job.status = JobStatus.FINISHED
break
if status not in (JobStatus.FINISHED, JobStatus.FAILED):
logger.debug(
"/Scheduler-%s Not all jobs are done yet, job %s is %s",
self.name,
job.index,
JobStatus.get_name(status),
)
# Try to resubmit the job for retrying
if status == JobStatus.RETRYING:
logger.debug(
"/Scheduler-%s Job %s is retrying ...",
self.name,
job.index,
)
await self.retry_job(job)
ret = False
# not returning here
# might wait for callbacks or halt on other jobs
return n_running < self.forks if on == "submittable" else ret
async def kill_running_jobs(self, jobs: List[Job]):DOCS
"""Try to kill all running jobs
Args:
jobs: The list of jobs
"""
logger.warning("/Scheduler-%s Killing running jobs ...", self.name)
for job in jobs:
status = job.status
if status in (JobStatus.SUBMITTED, JobStatus.RUNNING):
await self.kill_job_and_update_status(job)
async def job_is_submitted_or_running(self, job: Job) -> bool:DOCS
"""Check if a job is already submitted or running
Args:
job: The job
Returns:
True if yes otherwise False.
"""
if job.jid_file.is_file():
if await self.job_is_running(job):
job.status = JobStatus.SUBMITTED
return True
return False
async def job_fails_before_running(self, job: Job) -> bool:DOCS
"""Check if a job fails before running.
For some schedulers, the job might fail before running (after submission).
For example, the job might fail to allocate resources. In such a case,
the wrapped script might not be executed, and the job status will not be
updated (stays in SUBMITTED). We need to check such jobs and mark them as
FAILED.
For the instant scheduler, for example, the local scheduler, the failure will
be immediately reported when submitting the job, so we don't need to check
such jobs.
Args:
job: The job to check
Returns:
True if the job fails before running, otherwise False.
"""
return False
@propertyDOCS
def jobcmd_wrapper_init(self) -> str:
"""The init script for the job command wrapper"""
wrapper_init = get_jobcmd_wrapper_init(
not isinstance(self.workdir.mounted, CloudPath),
self.remove_jid_after_done,
)
if self.cwd:
# Some schedulers (e.g. Google Cloud Batch) doesn't support changing the
# working directory via configuration, so we need to change it in the
# wrapper script.
# See: https://issuetracker.google.com/issues/336164416
wrapper_init = f"cd {shlex.quote(self.cwd)}\n\n{wrapper_init}"
return wrapper_init
def jobcmd_shebang(self, job: Job) -> str:DOCS
"""The shebang of the wrapper script"""
wrapper_lang = (
JOBCMD_WRAPPER_LANG
if isinstance(JOBCMD_WRAPPER_LANG, (tuple, list))
else [JOBCMD_WRAPPER_LANG]
)
return shlex.join(wrapper_lang)
def jobcmd_init(self, job) -> str:DOCS
"""The job command init"""
init_code = []
if job.envs:
init_code.append("# Environment variables")
init_code.extend(
[
f"export {key}={shlex.quote(str(value))}"
for key, value in job.envs.items()
]
)
codes = plugin.hooks.on_jobcmd_init(self, job)
init_code.extend([code for code in codes if code])
return "\n".join(init_code)
def jobcmd_prep(self, job) -> str:DOCS
"""The job command preparation"""
codes = plugin.hooks.on_jobcmd_prep(self, job)
codes = [code for code in codes if code]
return "\n".join(codes)
def jobcmd_end(self, job) -> str:DOCS
"""The job command end"""
codes = plugin.hooks.on_jobcmd_end(self, job)
codes = [code for code in codes if code]
return "\n".join(codes)
def wrap_job_script(self, job: Job) -> str:DOCS
"""Wrap the job script
Args:
job: The job
Returns:
The wrapped script
"""
return JOBCMD_WRAPPER_TEMPLATE.format(
scheduler=self,
shebang=self.jobcmd_shebang(job),
status=JobStatus,
job=job,
cmd=shlex.join(job.cmd),
jobcmd_init=self.jobcmd_init(job),
jobcmd_prep=self.jobcmd_prep(job),
jobcmd_end=self.jobcmd_end(job),
)
def wrapped_job_script(self, job: Job) -> SpecPath:DOCS
"""Get the wrapped job script
Args:
job: The job
Returns:
The path of the wrapped job script
"""
base = f"job.wrapped.{self.name}"
wrapt_script = job.metadir / base
wrapt_script.write_text(self.wrap_job_script(job))
return wrapt_script
@abstractmethodDOCS
async def submit_job(self, job: Job) -> int | str:
"""Submit a job
Args:
job: The job
Returns:
The unique id in the scheduler system
"""
@abstractmethodDOCS
async def kill_job(self, job: Job):
"""Kill a job
Args:
job: The job
"""
@abstractmethodDOCS
async def job_is_running(self, job: Job) -> bool:
"""Check if a job is really running
Args:
job: The job
Returns:
True if yes otherwise False.
"""