"""Job to execute"""
from __future__ import annotations
import shlex
import shutil
from abc import ABC
from os import PathLike, unlink
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar, List, Optional
from aiopath import AsyncPath # type: ignore
from .defaults import (
DEFAULT_JOB_METADIR,
DEFAULT_JOB_CMD_WRAPPER_TEMPLATE,
DEFAULT_JOB_CMD_WRAPPER_SHELL,
JobStatus,
)
from .utils import (
logger,
a_mkdir,
a_read_text,
a_write_text,
asyncify,
replace_with_leading_space,
)
from .plugin import plugin
if TYPE_CHECKING: # pragma: no cover
from .scheduler import Scheduler
a_shutil_move = asyncify(shutil.move)
a_os_unlink = asyncify(unlink)
class Job(ABC):DOCS
"""The abstract class for job
Attributes:
CMD_WRAPPER_TEMPLATE: The template for job wrapping
CMD_WRAPPER_SHELL: The shell to run the wrapped script
cmd: The command
index: The index of the job
metadir: The metadir of the job
jid: The jid of the job in scheduler system
trial_count: The count for re-tries
hook_done: Mark whether hooks have already been. Since we don't have
a trigger for job finished/failed, so we do a polling on it. This
is to avoid calling the hooks repeatedly
_status: The status of the job
_rc: The return code of the job
_error_retry: Whether we should retry if error happened
_num_retries: Total number of retries
_wrapped_cmd: The wrapped cmd, used for job submission
Args:
index: The index of the job
cmd: The command of the job
metadir: The meta directory of the Job
error_retry: Whether we should retry if error happened
num_retries: Total number of retries
"""
__slots__ = (
"cmd",
"index",
"metadir",
"trial_count",
"_jid",
"_status",
"_rc",
"_error_retry",
"_num_retries",
"prev_status",
)
CMD_WRAPPER_SHELL: ClassVar[str] = DEFAULT_JOB_CMD_WRAPPER_SHELL
CMD_WRAPPER_TEMPLATE: ClassVar[str] = DEFAULT_JOB_CMD_WRAPPER_TEMPLATE
def __init__(
self,
index: int,
cmd: str | List[str],
metadir: PathLike = DEFAULT_JOB_METADIR,
error_retry: Optional[bool] = None,
num_retries: Optional[int] = None,
):
"""Construct"""
self.cmd = cmd
self.index = index
self.metadir = Path(metadir) / str(self.index)
self.metadir.mkdir(exist_ok=True, parents=True)
# The name of the job, should be the unique id from the scheduler
self.trial_count = 0
self.prev_status = JobStatus.INIT
self._jid: int | str | None = None
self._status = JobStatus.INIT
self._rc = -1
self._error_retry = error_retry
self._num_retries = num_retries
def __repr__(self) -> str:DOCS
"""repr of the job"""
if not self.jid:
return f"<{self.__class__.__name__}-{self.index}: ({self.cmd})>"
return (
f"<{self.__class__.__name__}-{self.index}({self.jid}): "
f"({self.cmd})>"
)
@propertyDOCS
def jid(self) -> int | str | None:
"""Get the jid of the job in scheduler system"""
if self._jid is None and not self.jid_file.is_file():
return None
if self._jid is not None:
return self._jid
self._jid = self.jid_file.read_text()
return self._jid
@jid.setter
def jid(self, uniqid: int | str):
self._jid = uniqid
self.jid_file.write_text(str(uniqid))
@propertyDOCS
def stdout_file(self) -> Path:
"""The stdout file of the job"""
return self.metadir / "job.stdout"
@propertyDOCS
def stderr_file(self) -> Path:
"""The stderr file of the job"""
return self.metadir / "job.stderr"
@propertyDOCS
def status_file(self) -> Path:
"""The status file of the job"""
return self.metadir / "job.status"
@propertyDOCS
def rc_file(self) -> Path:
"""The rc file of the job"""
return self.metadir / "job.rc"
@propertyDOCS
def jid_file(self) -> Path:
"""The jid file of the job"""
return self.metadir / "job.jid"
@propertyDOCS
def retry_dir(self) -> Path:
"""The retry directory of the job"""
return self.metadir / "job.retry"
@propertyDOCS
def status(self) -> int:
"""Query the status of the job
If the job is submitted, try to query it from the status file
Make sure the status is updated by trap in wrapped script
"""
self.prev_status = self._status
if self.status_file.is_file() and self._status in (
JobStatus.SUBMITTED,
JobStatus.RUNNING,
JobStatus.KILLING,
):
try:
self._status = int(self.status_file.read_text())
except (
FileNotFoundError,
ValueError,
TypeError,
): # pragma: no cover
pass
if (
self._status == JobStatus.FAILED
and self._error_retry
and self.trial_count < self._num_retries # type: ignore
):
self._status = JobStatus.RETRYING
if self.prev_status != self._status and (
self._status == JobStatus.RETRYING
or self._status >= JobStatus.KILLING
):
logger.info(
"/Job-%s Status changed: %r -> %r",
self.index,
*JobStatus.get_name(self.prev_status, self._status),
)
return self._status
@status.setter
def status(self, stat: int):
"""Set the status manually
Args:
stat: The status to set
"""
logger.debug(
"/Job-%s Status changed: %r -> %r",
self.index,
*JobStatus.get_name(self._status, stat),
)
self.prev_status = self._status
self._status = stat
@propertyDOCS
async def rc(self) -> int:
"""The return code of the job"""
if not await AsyncPath(self.rc_file).is_file():
return self._rc # pragma: no cover
return int(await a_read_text(self.rc_file))
@propertyDOCS
def strcmd(self) -> str:
"""Get the string representation of the command"""
if isinstance(self.cmd, list):
cmd = " ".join(shlex.quote(str(cmditem)) for cmditem in self.cmd)
else:
cmd = self.cmd
return cmd or "echo 'No script provided'"
def shebang(self, scheduler: Scheduler) -> str:DOCS
"""The shebang of the wrapped script"""
return f"#!{self.__class__.CMD_WRAPPER_SHELL}"
async def clean(self, retry=False):DOCS
"""Clean up the meta files
Args:
retry: Whether clean it for retrying
"""
if retry:
retry_dir = self.retry_dir / str(self.trial_count)
if await AsyncPath(retry_dir).exists():
shutil.rmtree(retry_dir)
await a_mkdir(retry_dir, parents=True)
if await AsyncPath(self.stdout_file).is_file():
shutil.move(str(self.stdout_file), str(retry_dir))
if await AsyncPath(self.stderr_file).is_file():
shutil.move(str(self.stderr_file), str(retry_dir))
if await AsyncPath(self.status_file).is_file():
shutil.move(str(self.status_file), str(retry_dir))
if await AsyncPath(self.rc_file).is_file():
shutil.move(str(self.rc_file), str(retry_dir))
else:
if await AsyncPath(self.stdout_file).is_file():
unlink(self.stdout_file)
if await AsyncPath(self.stderr_file).is_file():
unlink(self.stderr_file)
if await AsyncPath(self.status_file).is_file():
unlink(self.status_file)
if await AsyncPath(self.rc_file).is_file():
unlink(self.rc_file)
async def wrapped_script(self, scheduler: Scheduler) -> PathLike:DOCS
"""Get the wrapped script
Args:
scheduler: The scheduler
Returns:
The path of the wrapped script
"""
wrapt_script = self.metadir / f"job.wrapped.{scheduler.name}"
shebang = self.shebang(scheduler)
jobcmd_init = plugin.hooks.on_jobcmd_init(scheduler, self)
jobcmd_prep = plugin.hooks.on_jobcmd_prep(scheduler, self)
jobcmd_end = plugin.hooks.on_jobcmd_end(scheduler, self)
wrapt_cmd = self.__class__.CMD_WRAPPER_TEMPLATE.replace("#![shebang]", shebang)
wrapt_cmd = replace_with_leading_space(
wrapt_cmd, "#![jobcmd_init]", "\n\n".join(jobcmd_init)
)
wrapt_cmd = replace_with_leading_space(
wrapt_cmd, "#![jobcmd_prep]", "\n\n".join(jobcmd_prep)
)
wrapt_cmd = replace_with_leading_space(
wrapt_cmd, "#![jobcmd_end]", "\n\n".join(jobcmd_end)
)
wrapt_cmd = replace_with_leading_space(
wrapt_cmd, "#![prescript]", scheduler.config.prescript
)
wrapt_cmd = replace_with_leading_space(
wrapt_cmd, "#![postscript]", scheduler.config.postscript
)
wrapt_cmd = wrapt_cmd.format(job=self, status=JobStatus)
await a_write_text(wrapt_script, wrapt_cmd)
return wrapt_script