"""Job to execute"""
from __future__ import annotations
import shlex
from typing import Tuple
from .defaults import JobStatus
from .utils import logger, CommandType
from .path import SpecPath
class Job:DOCS
"""The class for job
Attributes:
cmd: The command
index: The index of the job
metadir: The metadir of the job
jid: The jid of the job in scheduler system
trial_count: The count for re-tries
hook_done: Mark whether hooks have already been. Since we don't have
a trigger for job finished/failed, so we do a polling on it. This
is to avoid calling the hooks repeatedly
_status: The status of the job
_rc: The return code of the job
_error_retry: Whether we should retry if error happened
_num_retries: Total number of retries
Args:
index: The index of the job
cmd: The command of the job
metadir: The meta directory of the Job
error_retry: Whether we should retry if error happened
num_retries: Total number of retries
"""
__slots__ = (
"cmd",
"index",
"metadir",
"trial_count",
"_jid",
"_status",
"_rc",
"_error_retry",
"_num_retries",
"prev_status",
)
def __init__(
self,
index: int,
cmd: CommandType,
workdir: SpecPath,
error_retry: bool | None = None,
num_retries: int | None = None,
):
"""Construct a new Job
Args:
index: The index of the job
cmd: The command of the job
metadir: The meta directory of the Job
error_retry: Whether we should retry if error happened
num_retries: Total number of retries
"""
self.cmd: Tuple[str, ...] = tuple(
map(
str,
(cmd if isinstance(cmd, (tuple, list)) else shlex.split(cmd)),
)
)
self.index = index
self.metadir = workdir / str(self.index)
self.metadir.mkdir(exist_ok=True, parents=True)
# The name of the job, should be the unique id from the scheduler
self.trial_count = 0
self.prev_status = JobStatus.INIT
self._jid: int | str | None = None
self._status = JobStatus.INIT
self._rc = -1
self._error_retry = error_retry
self._num_retries = num_retries
def __repr__(self) -> str:DOCS
"""repr of the job"""
prefix = f"{self.__class__.__name__}-{self.index}"
if not self.jid:
return f"<{prefix}: ({self.cmd})>"
return f"<{prefix}({self.jid}): ({self.cmd})>"
@propertyDOCS
def jid(self) -> int | str | None:
"""Get the jid of the job in scheduler system"""
if self._jid is None and not self.jid_file.is_file():
return None
if self._jid is not None:
return self._jid
self._jid = self.jid_file.read_text()
return self._jid
@jid.setter
def jid(self, uniqid: int | str):
self._jid = uniqid
self.jid_file.write_text(str(uniqid))
@propertyDOCS
def stdout_file(self) -> SpecPath:
"""The stdout file of the job"""
return self.metadir / "job.stdout"
@propertyDOCS
def stderr_file(self) -> SpecPath:
"""The stderr file of the job"""
return self.metadir / "job.stderr"
@propertyDOCS
def status_file(self) -> SpecPath:
"""The status file of the job"""
return self.metadir / "job.status"
@propertyDOCS
def rc_file(self) -> SpecPath:
"""The rc file of the job"""
return self.metadir / "job.rc"
@propertyDOCS
def jid_file(self) -> SpecPath:
"""The jid file of the job"""
return self.metadir / "job.jid"
@propertyDOCS
def retry_dir(self) -> SpecPath:
"""The retry directory of the job"""
return self.metadir / "job.retry"
@propertyDOCS
def status(self) -> int:
"""Query the status of the job
If the job is submitted, try to query it from the status file
Make sure the status is updated by trap in wrapped script
"""
self.prev_status = self._status
if self.status_file.is_file() and self._status in (
JobStatus.SUBMITTED,
JobStatus.RUNNING,
JobStatus.KILLING,
):
try:
self._status = int(self.status_file.read_text())
except (
FileNotFoundError,
ValueError,
TypeError,
): # pragma: no cover
pass
if (
self._status == JobStatus.FAILED
and self._error_retry
and self.trial_count < self._num_retries # type: ignore
):
self._status = JobStatus.RETRYING
if self.prev_status != self._status and (
self._status == JobStatus.RETRYING or self._status >= JobStatus.KILLING
):
logger.info(
"/Job-%s Status changed: %r -> %r",
self.index,
*JobStatus.get_name(self.prev_status, self._status),
)
return self._status
@status.setter
def status(self, stat: int):
"""Set the status manually
Args:
stat: The status to set
"""
logger.debug(
"/Job-%s Status changed: %r -> %r",
self.index,
*JobStatus.get_name(self._status, stat),
)
self.prev_status = self._status
self._status = stat
@propertyDOCS
def rc(self) -> int:
"""The return code of the job"""
if not self.rc_file.is_file():
return self._rc # pragma: no cover
return int(self.rc_file.read_text())
def clean(self, retry=False):DOCS
"""Clean up the meta files
Args:
retry: Whether clean it for retrying
"""
files_to_clean = [
self.stdout_file,
self.stderr_file,
self.status_file,
self.rc_file,
]
if retry:
retry_dir = self.retry_dir / str(self.trial_count)
if retry_dir.exists():
retry_dir.rmtree()
retry_dir.mkdir(parents=True)
for file in files_to_clean:
if file.is_file():
file.rename(retry_dir / file.name)
else:
for file in files_to_clean:
if file.is_file():
file.unlink()