Skip to content

SOURCE CODE xqute.schedulers.local_scheduler DOCS

"""The scheduler to run jobs locally"""

import asyncio
import os
import shlex

from cloudpathlib import CloudPath

from ..job import Job
from ..scheduler import Scheduler
from ..utils import localize


def _pid_exists(pid: int) -> bool:
    """Check if a process with a given pid exists"""
    try:
        os.kill(pid, 0)
    except Exception:
        return False
    return True


class LocalScheduler(Scheduler):DOCS
    """The local scheduler

    Attributes:
        name: The name of the scheduler
        job_class: The job class
    """

    name = "local"

    async def submit_job(self, job: Job) -> int:DOCS
        """Submit a job locally

        Args:
            job: The job

        Returns:
            The process id
        """
        wrapt_script = localize(self.wrapped_job_script(job))
        proc = await asyncio.create_subprocess_exec(
            *shlex.split(self.jobcmd_shebang(job)),
            wrapt_script,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        # wait for a while to make sure the process is running
        # this is to avoid the real command is not run when proc is recycled too early
        # this happens for python < 3.12
        while not job.stderr_file.exists() and not job.stdout_file.exists():
            if proc.returncode is not None:
                # The process has already finished and no stdout/stderr files are
                # generated
                # Something went wrong with the wrapper script?
                stderr = await proc.stderr.read()
                raise RuntimeError(
                    f"Failed to submit job #{job.index}: {stderr.decode()}"
                )

            if isinstance(job.metadir, CloudPath):
                await asyncio.sleep(2)
            else:
                await asyncio.sleep(0.1)

        # don't await for the results, as this will run the real command
        return proc.pid

    async def kill_job(self, job: Job):DOCS
        """Kill a job asynchronously

        Args:
            job: The job
        """
        try:
            os.killpg(int(job.jid), 9)
        except Exception:  # pragma: no cover
            pass

    async def job_is_running(self, job: Job) -> bool:DOCS
        """Tell if a job is really running, not only the job.jid_file

        In case where the jid file is not cleaned when job is done.

        Args:
            job: The job

        Returns:
            True if it is, otherwise False
        """
        try:
            jid = int(job.jid_file.read_text().strip())
        except (ValueError, TypeError, FileNotFoundError):
            return False

        if jid <= 0:
            return False

        return _pid_exists(jid)