Skip to content

SOURCE CODE xqute.schedulers.sge_scheduler DOCS

"""The scheduler to run jobs on SGE"""
import asyncio
from typing import Type

from ..job import Job
from ..scheduler import Scheduler
from ..utils import a_read_text


class SgeJob(Job):DOCS
    """SGE job"""

    def shebang(self, scheduler: Scheduler) -> str:DOCS
        """Make the shebang with options

        Args:
            scheduler: The scheduler

        Returns:
            The shebang with options
        """
        options = {
            key[4:]: val
            for key, val in scheduler.config.items()
            if key.startswith("sge_")
        }
        qsub_options = {
            key[5:]: val
            for key, val in scheduler.config.items()
            if key.startswith("qsub_")
        }
        options.update(qsub_options)
        jobname_prefix = scheduler.config.get(
            "scheduler_jobprefix", scheduler.name
        )
        options["N"] = f"{jobname_prefix}.{self.index}"
        options["cwd"] = True
        options["o"] = self.stdout_file
        options["e"] = self.stderr_file

        options_list = []
        for key, val in options.items():
            if val is True:
                options_list.append(f"#$ -{key}")
            elif isinstance(val, (tuple, list)):
                for optval in val:
                    options_list.append(f"#$ -{key} {optval}")
            else:
                options_list.append(f"#$ -{key} {val}")
        options_str = "\n".join(options_list)

        return f"{super().shebang(scheduler)}\n{options_str}\n"


class SgeScheduler(Scheduler):DOCS
    """The sge scheduler

    Attributes:
        name: The name of the scheduler
        job_class: The job class

    Args:
        qsub: path to qsub command
        qstat: path to qstat command
        qdel: path to qdel command
        sge_*: SGE options for qsub. List or tuple options will be expanded.
            For example: `sge_l=['hvmem=2G', 'gpu=1']` will be expaned into
            `-l h_vmem=2G -l gpu=1`
        ... other Scheduler args
    """

    name: str = "sge"
    job_class: Type[Job] = SgeJob

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.qsub = self.config.get("qsub", "qsub")
        self.qdel = self.config.get("qdel", "qdel")
        self.qstat = self.config.get("qstat", "qstat")

    async def submit_job(self, job: Job) -> str:DOCS
        """Submit a job to SGE

        Args:
            job: The job

        Returns:
            The job id
        """
        proc = await asyncio.create_subprocess_exec(
            self.qsub,
            str(await job.wrapped_script(self)),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await proc.communicate()
        # Your job 613815 (...) has been submitted
        return stdout.decode().split()[2]

    async def kill_job(self, job: Job):DOCS
        """Kill a job on SGE

        Args:
            job: The job
        """
        proc = await asyncio.create_subprocess_exec(
            self.qdel,
            str(job.jid),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        await proc.wait()

    async def job_is_running(self, job: Job) -> bool:DOCS
        """Tell if a job is really running, not only the job.jid_file

        In case where the jid file is not cleaned when job is done.

        Args:
            job: The job

        Returns:
            True if it is, otherwise False
        """
        try:
            jid = await a_read_text(job.jid_file)
        except FileNotFoundError:
            return False

        if not jid:
            return False

        proc = await asyncio.create_subprocess_exec(
            self.qstat,
            "-j",
            jid,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        await proc.wait()
        return proc.returncode == 0