Skip to content

SOURCE CODE xqute.defaults DOCS

"""Default settings and utilities for xqute

Attributes:
    DEFAULT_JOB_METADIR: The default meta directory for jobs
    DEFAULT_JOB_ERROR_STRATEGY: The default strategy when there is
        error happened
    DEFAULT_JOB_NUM_RETRIES: Default number of retries when
        DEFAULT_JOB_ERROR_STRATEGY is retry
    DEFAULT_JOB_CMD_WRAPPER_SHELL: The default shell for job wrapper
    DEFAULT_JOB_CMD_WRAPPER_TEMPLATE: The template for job cmd wrapping
    DEFAULT_SCHEDULER_FORKS: Default number of job forks for scheduler
    DEFAULT_JOB_SUBMISSION_BATCH: Default consumer workers
"""
from __future__ import annotations

import asyncio
from pathlib import Path
from typing import Tuple
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


class JobErrorStrategy:DOCS
    """The strategy when error happen from jobs

    Attributes:
        IGNORE: ignore and run next jobs
        RETRY: retry the job
        HALT: halt the whole program
    """
    IGNORE: str = 'ignore'
    RETRY: str = 'retry'
    HALT: str = 'halt'


class JobStatus:DOCS
    """The status of a job

    Life cycles:
    ........................queued in scheduler
    INIT -> QUEUED -> SUBMITTED -> RUNNING -> FINISHED (FAILED)
    INIT -> QUEUED -> SUBMITTED -> RUNNING -> KILLING -> FINISHED
    INIT -> QUEUED -> SUBMITTED -> KILLING -> FINISHED
    INIT -> QUEUED -> (CANELED)

    Attributes:
        INIT: When a job is initialized
        RETRYING: When a job is to be retried
        QUEUED: When a job is queued
        SUBMITTED: When a job is sumitted
        RUNNING: When a job is running
        KILLING: When a job is being killed
        FINISHED: When a job is finished
        FAILED: When a job is failed
    """
    INIT: int = 0
    RETRYING: int = 1
    QUEUED: int = 2
    SUBMITTED: int = 3
    RUNNING: int = 4
    KILLING: int = 5
    FINISHED: int = 6
    FAILED: int = 7

    @classmethodDOCS
    def get_name(cls, *statuses: int) -> Tuple[str, ...] | str:
        """Get the name of the status

        Args:
            *statuses: The status values

        Returns:
            The name of the status if a single status is passed, otherwise
            a tuple of names
        """
        ret_dict = {}
        for name, value in cls.__dict__.items():
            if value in statuses:
                ret_dict[value] = name
        ret_tuple = tuple(ret_dict[status] for status in statuses)
        if len(ret_tuple) > 1:
            return ret_tuple
        return ret_tuple[0]  # pragma: no cover


LOGGER_NAME = 'XQUTE'

DEFAULT_SCHEDULER_FORKS: int = 1
DEFAULT_JOB_METADIR: Path = Path('./.xqute')
DEFAULT_JOB_ERROR_STRATEGY: str = JobErrorStrategy.IGNORE
DEFAULT_JOB_NUM_RETRIES: int = 3
DEFAULT_JOB_SUBMISSION_BATCH: int = 8
DEFAULT_JOB_CMD_WRAPPER_SHELL: str = '/bin/bash'
DEFAULT_JOB_CMD_WRAPPER_TEMPLATE: str = r"""#![shebang]

set -u -e -E -o pipefail

# BEGIN: Update job status
echo {status.RUNNING} > {job.status_file}
# END: Update job status

# BEGIN: plugins.on_jobcmd_init
#![jobcmd_init]
# END: plugins.on_jobcmd_init

# BEGIN: Pre-command place holder
#![prescript]
# END: Pre-command place holder

# BEGIN: Trap command
cleanup() {{
    rc=$?
    echo $rc > {job.rc_file}
    if [[ $rc -eq 0 ]]; then
        echo {status.FINISHED} > {job.status_file}
    else
        echo {status.FAILED} > {job.status_file}
    fi
    rm -f {job.jid_file}

    # BEGIN: Post-command place holder
    #![postscript]
    # END: Post-command place holder

    # BEGIN: plugins.on_jobcmd_end
    #![jobcmd_end]
    # END: plugins.on_jobcmd_end

    exit $rc
}}
# END: Trap command

# BEGIN: Trap exit
trap "cleanup" EXIT
# END: Trap exit

cmd="{job.strcmd} \
    1>{job.stdout_file} \
    2>{job.stderr_file}"

# BEGIN: plugins.on_jobcmd_prep
#![jobcmd_prep]
# END: plugins.on_jobcmd_prep

# BEGIN: Run the command
eval "$cmd"
# END: Run the command
"""