Skip to content

SOURCE CODE xqute.defaults DOCS

"""Default settings and utilities for xqute

Attributes:
    DEFAULT_WORKDIR: The default work directory for jobs to save the metadata
    DEFAULT_ERROR_STRATEGY: The default strategy when there is
        error happened
    DEFAULT_NUM_RETRIES: Default number of retries when
        DEFAULT_ERROR_STRATEGY is retry
    DEFAULT_JOB_CMD_WRAPPER_SHELL: The default shell for job wrapper
    DEFAULT_SCHEDULER_FORKS: Default number of job forks for scheduler
    DEFAULT_SUBMISSION_BATCH: Default consumer workers
"""

from __future__ import annotations

import asyncio
import textwrap
from typing import Tuple
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class JobErrorStrategy:DOCS
    """The strategy when error happen from jobs

    Attributes:
        IGNORE: ignore and run next jobs
        RETRY: retry the job
        HALT: halt the whole program
    """

    IGNORE: str = "ignore"
    RETRY: str = "retry"
    HALT: str = "halt"


class JobStatus:DOCS
    """The status of a job

    Life cycles:
    ........................queued in scheduler
    INIT -> QUEUED -> SUBMITTED -> RUNNING -> FINISHED (FAILED)
    INIT -> QUEUED -> SUBMITTED -> RUNNING -> KILLING -> FINISHED
    INIT -> QUEUED -> SUBMITTED -> KILLING -> FINISHED
    INIT -> QUEUED -> (CANCELLED)

    Attributes:
        INIT: When a job is initialized
        RETRYING: When a job is to be retried
        QUEUED: When a job is queued
        SUBMITTED: When a job is submitted
        RUNNING: When a job is running
        KILLING: When a job is being killed
        FINISHED: When a job is finished
        FAILED: When a job is failed
    """

    INIT: int = 0
    RETRYING: int = 1
    QUEUED: int = 2
    SUBMITTED: int = 3
    RUNNING: int = 4
    KILLING: int = 5
    FINISHED: int = 6
    FAILED: int = 7

    @classmethodDOCS
    def get_name(cls, *statuses: int) -> Tuple[str, ...] | str:
        """Get the name of the status

        Args:
            *statuses: The status values

        Returns:
            The name of the status if a single status is passed, otherwise
            a tuple of names
        """
        ret_dict = {}
        for name, value in cls.__dict__.items():
            if value in statuses:
                ret_dict[value] = name
        ret_tuple = tuple(ret_dict[status] for status in statuses)
        if len(ret_tuple) > 1:
            return ret_tuple
        return ret_tuple[0]  # pragma: no cover


LOGGER_NAME = "XQUTE"

DEFAULT_SCHEDULER_FORKS: int = 1
DEFAULT_WORKDIR = "./.xqute"
DEFAULT_ERROR_STRATEGY: str = JobErrorStrategy.IGNORE
DEFAULT_NUM_RETRIES: int = 3
DEFAULT_SUBMISSION_BATCH: int = 8
JOBCMD_WRAPPER_LANG: str = "/bin/bash"
JOBCMD_WRAPPER_TEMPLATE: str = r"""#!{shebang}
set -u -E -o pipefail

{scheduler.jobcmd_wrapper_init}

# tell the xqute that the job is submitted
update_metafile "{status.RUNNING}" "{job.mounted_metadir}/job.status"
update_metafile "" "{job.mounted_metadir}/job.stdout"

# plugins.on_jobcmd_init
{jobcmd_init}

# prescript
{scheduler.prescript}

cleanup() {{
    rc=$?
    update_metafile "$rc" "{job.mounted_metadir}/job.rc"
    if [[ $rc -eq 0 ]]; then
        update_metafile "{status.FINISHED}" "{job.mounted_metadir}/job.status"
    else
        update_metafile "{status.FAILED}" "{job.mounted_metadir}/job.status"
    fi

    remove_metafile "{job.mounted_metadir}/job.jid"

    # postscript
    {scheduler.postscript}

    # plugins.on_jobcmd_end
    {jobcmd_end}

    sync
    exit $rc
}}

# register trap
trap "cleanup" EXIT

cmd=$(compose_cmd "{cmd}" "{job.mounted_metadir}/job.stdout" "{job.mounted_metadir}/job.stderr")

# plugins.on_jobcmd_prep
{jobcmd_prep}

# Run the command, the real job
eval "$cmd"
"""  # noqa: E501


def get_jobcmd_wrapper_init(local: bool, remove_jid_after_done: bool) -> str:DOCS
    """Get the job command wrapper initialization script

    Args:
        local: Whether the job is running locally
        remove_jid_after_done: Whether to remove the remote job id file
            after the job is done

    Returns:
        The job command wrapper initialization script
    """
    if local:
        rm_file = (
            'rm -f "$file"'
            if remove_jid_after_done
            else 'if [ "file" != *"job.jid" ]; then rm -f "$file"; fi'
        )
        return textwrap.dedent(
            f"""
            export META_ON_CLOUD=0
            update_metafile() {{
                local content=$1
                local file=$2
                echo "$content" > "$file"
            }}
            remove_metafile() {{
                local file=$1
                {rm_file}
            }}
            compose_cmd() {{
                local cmd=$1
                local stdout_file=$2
                local stderr_file=$3
                echo "$cmd 1>$stdout_file 2>$stderr_file"
            }}
            """
        )
    else:
        rm_file = (
            'cloudsh rm -f "$file"'
            if remove_jid_after_done
            else 'if [ "file" != *"job.jid" ]; then cloudsh rm -f "$file"; fi'
        )
        return textwrap.dedent(
            f"""
            export META_ON_CLOUD=1
            # Check if cloudsh is installed
            if ! command -v cloudsh &> /dev/null; then
                echo "cloudsh is not installed to support cloud workdir, please install it first" 1>&2
                exit 1
            fi
            update_metafile() {{
                local content=$1
                local file=$2
                echo "$content" | cloudsh sink "$file"
            }}
            remove_metafile() {{
                local file=$1
                {rm_file}
            }}
            compose_cmd() {{
                local cmd=$1
                local stdout_file=$2
                local stderr_file=$3
                # create temp files to save stderr
                stderrtmp=$(mktemp)
                echo "$cmd 2>$stderrtmp | cloudsh sink $stdout_file; \\
                    rc=\\$?; \\
                    cloudsh mv $stderrtmp $stderr_file; \\
                    exit \\$rc"
            }}
            """  # noqa: E501
        )