Skip to content

SOURCE CODE xqute.schedulers.local_scheduler DOCS

"""The scheduler to run jobs locally"""
import asyncio
import os
from typing import Type

from ..job import Job
from ..scheduler import Scheduler
from ..utils import a_read_text


class LocalJob(Job):DOCS
    """Local job"""


class LocalScheduler(Scheduler):DOCS
    """The local scheduler

    Attributes:
        name: The name of the scheduler
        job_class: The job class
    """

    name = "local"
    job_class: Type[Job] = LocalJob

    async def submit_job(self, job: Job) -> int:DOCS
        """Submit a job locally

        Args:
            job: The job

        Returns:
            The process id
        """
        proc = await asyncio.create_subprocess_exec(
            job.CMD_WRAPPER_SHELL,
            str(await job.wrapped_script(self)),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        # wait for a while to make sure the process is running
        # this is to avoid the real command is not run when proc is recycled too early
        # this happens for python < 3.12
        while not job.stderr_file.exists() or not job.stdout_file.exists():
            await asyncio.sleep(0.05)
        # don't await for the results, as this will run the real command
        return proc.pid

    async def kill_job(self, job: Job):DOCS
        """Kill a job asynchronously

        Args:
            job: The job
        """
        try:
            os.killpg(int(job.jid), 9)
        except Exception:  # pragma: no cover
            pass

    async def job_is_running(self, job: Job) -> bool:DOCS
        """Tell if a job is really running, not only the job.jid_file

        In case where the jid file is not cleaned when job is done.

        Args:
            job: The job

        Returns:
            True if it is, otherwise False
        """
        try:
            jid = int(await a_read_text(job.jid_file))
        except (ValueError, TypeError, FileNotFoundError):
            return False

        if jid <= 0:
            return False

        try:
            os.kill(jid, 0)
        except Exception:  # pragma: no cover
            return False

        return True