Skip to content

SOURCE CODE xqute.schedulers.sge_scheduler DOCS

"""The scheduler to run jobs on SGE"""

import asyncio
import hashlib

from ..job import Job
from ..scheduler import Scheduler
from ..utils import localize


class SgeScheduler(Scheduler):DOCS
    """The sge scheduler

    Attributes:
        name: The name of the scheduler
        job_class: The job class

    Args:
        qsub: path to qsub command
        qstat: path to qstat command
        qdel: path to qdel command
        ...: other Scheduler args. List or tuple options will be expanded.
            For example: `sge_l=['hvmem=2G', 'gpu=1']` will be expaned into
            `-l h_vmem=2G -l gpu=1`
    """

    name: str = "sge"

    __slots__ = Scheduler.__slots__ + ("qsub", "qdel", "qstat")

    def __init__(self, *args, **kwargs):
        self.qsub = kwargs.pop("qsub", "qsub")
        self.qdel = kwargs.pop("qdel", "qdel")
        self.qstat = kwargs.pop("qstat", "qstat")
        super().__init__(*args, **kwargs)

    def jobcmd_shebang(self, job) -> str:DOCS
        options = self.config.copy()
        sha = hashlib.sha256(str(self.workdir).encode()).hexdigest()[:8]
        options["N"] = f"{self.jobname_prefix}-{sha}-{job.index}"
        options["cwd"] = True
        # options["o"] = self.stdout_file
        # options["e"] = self.stderr_file

        options_list = []
        for key, val in options.items():
            if val is True:
                options_list.append(f"#$ -{key}")
            elif isinstance(val, (tuple, list)):
                for optval in val:
                    options_list.append(f"#$ -{key} {optval}")
            else:
                options_list.append(f"#$ -{key} {val}")

        return super().jobcmd_shebang(job) + "\n" + "\n".join(options_list)

    async def submit_job(self, job: Job) -> str:DOCS
        """Submit a job to SGE

        Args:
            job: The job

        Returns:
            The job id
        """
        proc = await asyncio.create_subprocess_exec(
            self.qsub,
            localize(self.wrapped_job_script(job)),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await proc.communicate()
        if proc.returncode != 0:  # pragma: no cover
            raise RuntimeError(f"Can't submit job to SGE: {stderr.decode()}")

        # Your job 613815 (...) has been submitted
        try:
            job_id = stdout.decode().split()[2]
        except Exception:  # pragma: no cover
            raise RuntimeError("Can't get job id from qsub output.", stdout, stderr)

        return job_id

    async def kill_job(self, job: Job):DOCS
        """Kill a job on SGE

        Args:
            job: The job
        """
        proc = await asyncio.create_subprocess_exec(
            self.qdel,
            str(job.jid),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        await proc.wait()

    async def job_is_running(self, job: Job) -> bool:DOCS
        """Tell if a job is really running, not only the job.jid_file

        In case where the jid file is not cleaned when job is done.

        Args:
            job: The job

        Returns:
            True if it is, otherwise False
        """
        try:
            jid = job.jid_file.read_text().strip()
        except FileNotFoundError:  # pragma: no cover
            return False

        if not jid:
            return False

        proc = await asyncio.create_subprocess_exec(
            self.qstat,
            "-j",
            jid,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        return await proc.wait() == 0