Skip to content

SOURCE CODE xqute.job DOCS

"""Job to execute"""

from __future__ import annotations

import shlex
from typing import Any, Tuple

from .defaults import JobStatus
from .utils import logger, CommandType
from .path import SpecPath


class Job:DOCS
    """The class for job

    Attributes:

        cmd: The command
        index: The index of the job
        metadir: The metadir of the job
        jid: The jid of the job in scheduler system
        trial_count: The count for re-tries
        _status: The status of the job
        _rc: The return code of the job
        _error_retry: Whether we should retry if error happened
        _num_retries: Total number of retries

    Args:
        index: The index of the job
        cmd: The command of the job
        metadir: The meta directory of the Job
        error_retry: Whether we should retry if error happened
        num_retries: Total number of retries
    """

    __slots__ = (
        "cmd",
        "index",
        "metadir",
        "trial_count",
        "_jid",
        "_status",
        "_rc",
        "_error_retry",
        "_num_retries",
        "envs",
    )

    def __init__(
        self,
        index: int,
        cmd: CommandType,
        workdir: SpecPath,
        error_retry: bool | None = None,
        num_retries: int | None = None,
        envs: dict[str, Any] | None = None,
    ):
        """Construct a new Job

        Args:
            index: The index of the job
            cmd: The command of the job
            metadir: The meta directory of the Job
            error_retry: Whether we should retry if error happened
            num_retries: Total number of retries
        """
        self.cmd: Tuple[str, ...] = tuple(
            map(
                str,
                (cmd if isinstance(cmd, (tuple, list)) else shlex.split(cmd)),
            )
        )
        self.index = index
        self.envs = envs or {}
        self.envs["XQUTE_JOB_INDEX"] = str(self.index)
        self.envs["XQUTE_METADIR"] = str(workdir)
        self.metadir = workdir / str(self.index)  # type: ignore
        self.envs["XQUTE_JOB_METADIR"] = str(self.metadir)
        # For cloud paths, this requires cloud client
        # self.metadir.mkdir(exist_ok=True, parents=True)
        # Let Scheduler.create_job handle metadir creation

        # The name of the job, should be the unique id from the scheduler
        self.trial_count = 0

        self._jid: int | str | None = None
        self._status = JobStatus.INIT
        self._error_retry = error_retry
        self._num_retries = num_retries

    def __repr__(self) -> str:DOCS
        """repr of the job"""
        prefix = f"{self.__class__.__name__}-{self.index}"
        if not self._jid:
            return f"<{prefix}: ({self.cmd})>"
        return f"<{prefix}({self._jid}): ({self.cmd})>"

    @propertyDOCS
    def stdout_file(self) -> SpecPath:
        """The stdout file of the job"""
        return self.metadir / "job.stdout"

    @propertyDOCS
    def stderr_file(self) -> SpecPath:
        """The stderr file of the job"""
        return self.metadir / "job.stderr"

    @propertyDOCS
    def status_file(self) -> SpecPath:
        """The status file of the job"""
        return self.metadir / "job.status"

    @propertyDOCS
    def rc_file(self) -> SpecPath:
        """The rc file of the job"""
        return self.metadir / "job.rc"

    @propertyDOCS
    def jid_file(self) -> SpecPath:
        """The jid file of the job"""
        return self.metadir / "job.jid"

    @propertyDOCS
    def retry_dir(self) -> SpecPath:
        """The retry directory of the job"""
        return self.metadir / "job.retry"

    async def get_jid(self) -> int | str | None:DOCS
        """Get the jid of the job in scheduler system"""
        if self._jid is None and not await self.jid_file.a_is_file():
            return None
        if self._jid is not None:
            return self._jid
        self._jid = await self.jid_file.a_read_text()
        return self._jid

    async def set_jid(self, uniqid: int | str):
        self._jid = uniqid
        await self.jid_file.a_write_text(str(uniqid))

    async def get_status(self, refresh: bool = False) -> int:DOCS
        """Query the status of the job

        If the job is submitted, try to query it from the status file
        Make sure the status is updated by trap in wrapped script

        Uses caching to avoid excessive file I/O. Cache is invalidated
        when status is explicitly set.

        Args:
            refresh: Whether to refresh the status from file
        """
        if not refresh:
            return self._status

        if await self.status_file.a_is_file() and self._status in (
            JobStatus.SUBMITTED,
            JobStatus.RUNNING,
            JobStatus.KILLING,
        ):
            try:
                status_text = await self.status_file.a_read_text()
                self._status = int(status_text)
            except (
                FileNotFoundError,
                ValueError,
                TypeError,
            ):  # pragma: no cover
                pass

        # if (
        #     self._status == JobStatus.FAILED
        #     and self._error_retry
        #     and self.trial_count < self._num_retries  # type: ignore
        # ):
        #     self._status = JobStatus.RETRYING

        # Don't log here - let scheduler handle transition logging
        return self._status

    async def set_status(self, stat: int, flush: bool = True) -> None:DOCS
        """Set the status manually

        Args:
            stat: The status to set
            flush: Whether to flush the status to file
        """
        # Only log if status is actually changing
        prev_status = self._status

        if stat != prev_status:
            logger.debug(
                "/Job-%s Status changed: %r -> %r",
                self.index,
                *JobStatus.get_name(prev_status, stat),
            )
            self._status = stat
            if flush:
                await self.status_file.a_write_text(str(stat))

    async def get_rc(self) -> int:DOCS
        """The return code of the job"""
        if not await self.rc_file.a_is_file():
            return -9
        return int(await self.rc_file.a_read_text())

    async def set_rc(self, rc: int | str) -> None:DOCS
        """Set the return code of the job

        Args:
            rc: The return code
        """
        await self.rc_file.a_write_text(str(rc))

    async def clean(self, retry: bool = False) -> None:DOCS
        """Clean up the meta files

        Args:
            retry: Whether clean it for retrying
        """
        files_to_clean = [
            self.stdout_file,
            self.stderr_file,
            self.status_file,
            self.rc_file,
        ]

        if retry:
            retry_dir = self.retry_dir / str(self.trial_count)  # type: ignore
            if await retry_dir.a_exists():
                await retry_dir.a_rmtree()
            await retry_dir.a_mkdir(parents=True)

            for file in files_to_clean:
                if await file.a_is_file():
                    await file.a_rename(retry_dir / file.name)
        else:
            for file in files_to_clean:
                if await file.a_is_file():
                    await file.a_unlink()