Skip to content

SOURCE CODE xqute.schedulers.local_scheduler DOCS

"""The scheduler to run jobs locally"""

import asyncio
import os
import shlex

from ..job import Job
from ..scheduler import Scheduler


def _pid_exists(pid: int) -> bool:
    """Check if a process with a given pid exists"""
    try:
        os.kill(pid, 0)
    except Exception:  # pragma: no cover
        return False
    return True


class LocalScheduler(Scheduler):DOCS
    """The local scheduler

    Attributes:
        name: The name of the scheduler
        job_class: The job class
    """

    name = "local"

    async def submit_job(self, job: Job, _mounted: bool = False) -> int:DOCS
        """Submit a job locally

        Args:
            job: The job
            _mounted: Whether to use the mounted path of the wrapped job script
                Used internally for container scheduler

        Returns:
            The process id
        """
        wrapt_script_path = (
            self.wrapped_job_script(job).mounted
            if _mounted
            else self.wrapped_job_script(job).fspath
        )
        # In case the process exits very quickly
        if not job.jid_file.exists():
            job.jid_file.write_text("0")

        proc = await asyncio.create_subprocess_exec(
            *shlex.split(self.jobcmd_shebang(job)),
            wrapt_script_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            preexec_fn=os.setsid,
            # Changing the working directory here may cause wrapped_job_script to fail
            # to be found, so we don't set cwd here.
            # The cwd is changed in the wrapper script instead.
            # cwd=self.cwd
        )

        # wait for a while to make sure the process is running
        # this is to avoid the real command is not run when proc is recycled too early
        # this happens for python < 3.12
        await asyncio.sleep(0.1)

        if job.stdout_file.exists():
            # job submitted successfully and already started very soon
            return proc.pid

        if proc.returncode is not None and proc.returncode != 0:
            # The process has already finished and no stdout/stderr files are
            # generated
            # Something went wrong with the wrapper script?
            stderr = await proc.stderr.read()
            raise RuntimeError(
                f"Failed to submit job #{job.index} (rc={proc.returncode}): "
                f"{stderr.decode()}\n"
                f"Command: {self.jobcmd_shebang(job)} "
                f"{wrapt_script_path}\n"
            )

        # don't await for the results, as this will run the real command
        return proc.pid

    async def kill_job(self, job: Job):DOCS
        """Kill a job asynchronously

        Args:
            job: The job
        """
        try:
            os.killpg(int(job.jid), 9)
        except Exception:  # pragma: no cover
            pass

    async def job_is_running(self, job: Job) -> bool:DOCS
        """Tell if a job is really running, not only the job.jid_file

        In case where the jid file is not cleaned when job is done.

        Args:
            job: The job

        Returns:
            True if it is, otherwise False
        """
        try:
            jid = int(job.jid_file.read_text().strip())
        except (ValueError, TypeError, FileNotFoundError):
            return False

        if jid <= 0:
            return False

        return _pid_exists(jid)