Skip to content

SOURCE CODE pipen.job DOCS

"""Provide the Job class"""

from __future__ import annotations

import logging
import shlex
from collections.abc import Iterable
from functools import cached_property
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Mapping

from yunpath import AnyPath, CloudPath
from diot import OrderedDiot
from xqute import Job as XquteJob
from xqute.path import SpecPath, MountedPath

from ._job_caching import JobCaching
from .defaults import ProcInputType, ProcOutputType
from .exceptions import (
    ProcInputTypeError,
    ProcOutputNameError,
    ProcOutputTypeError,
    ProcOutputValueError,
    TemplateRenderingError,
)
from .template import Template
from .utils import logger, strsplit, path_is_symlink, path_symlink_to

if TYPE_CHECKING:  # pragma: no cover
    from .proc import Proc


def _process_input_file_or_dir(
    inkey: str,
    intype: str,
    inval: Any,
    index: int | None = None,
    proc_name: str | None = None,
) -> CloudPath | MountedPath:
    """Process the input value for file or dir"""
    if inval is None or not isinstance(inval, (str, PathLike, Path, CloudPath)):
        msg = (
            f"[{proc_name}] Got <{type(inval).__name__}> instead of "
            f"PathLike object for input: {inkey + ':' + intype!r}"
        )
        if index is not None:
            msg = f"{msg} at index {index}"

        raise ProcInputTypeError(msg)

    if isinstance(inval, MountedPath):
        return inval

    if isinstance(inval, SpecPath):
        return inval.mounted

    if isinstance(inval, CloudPath):  # pragma: no cover
        return MountedPath(inval)

    if not isinstance(inval, str):  # other PathLike types, should be all local
        return MountedPath(Path(inval).expanduser().absolute())

    # str
    # Let's see if it a path in str format, which is path1:path2
    # However, there is also a colon in cloud paths
    colon_count = inval.count(":")
    if colon_count == 0:  # a/b
        return MountedPath(Path(inval).expanduser().absolute())

    if colon_count > 3:  # a:b:c:d
        msg = (
            f"[{proc_name}] Invalid input value: {inkey + ':' + intype!r} "
            "(too many ':')"
        )
        if index is not None:
            msg = f"{msg} at index {index}"

        raise ProcInputTypeError(msg)

    if colon_count == 1:  # gs://a/b or a/b:c/d
        if isinstance(AnyPath(inval), CloudPath):  # gs://a/b
            return MountedPath(inval)

        path1, path2 = inval.split(":")

    elif inval.count(":") == 3:  # gs://a/b:gs://c/d
        p1, p2, path2 = inval.split(":", 2)
        path1 = p1 + ":" + p2

    else:  # gs://a/b:c/d or a/b:gs://c/d
        p1, p2, p3 = inval.split(":", 2)
        path1, path2 = p1 + ":" + p2, p3
        if not isinstance(AnyPath(path1), CloudPath):
            path1, path2 = p1, p2 + ":" + p3

    path1 = AnyPath(path1)  # type: ignore
    path2 = AnyPath(path2)  # type: ignore
    if isinstance(path1, Path):
        path1 = path1.expanduser().absolute()
    if isinstance(path2, Path):
        path2 = path2.expanduser().absolute()

    return MountedPath(path2, spec=path1)


class Job(XquteJob, JobCaching):DOCS
    """The job for pipen"""

    __slots__ = XquteJob.__slots__ + ("proc", "_output_types", "_outdir")

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.proc: Proc = None
        self._output_types: Dict[str, str] = {}
        # Where the real output directory is
        self._outdir: SpecPath = None

    async def prepare(self, proc: Proc) -> None:DOCS
        """Prepare the job by given process

        Primarily prepare the script, and provide cmd to the job for xqute
        to wrap and run

        Args:
            proc: the process object
        """
        # Attach the process
        self.proc = proc

        # Where the jobs of "export" process should put their outputs
        export_outdir = proc.pipeline.outdir / proc.name  # type: ignore
        # Where the jobs of "export" process should put their outputs
        # (in the mounted filesystem)
        sched_mounted_outdir = getattr(proc.xqute.scheduler, "MOUNTED_OUTDIR", None)
        if sched_mounted_outdir is not None:  # pragma: no cover
            if (
                isinstance(proc.pipeline.outdir, SpecPath)
                and proc.pipeline.outdir.mounted.is_mounted()
            ):
                raise ValueError(
                    "The pipeline outdir is a SpecPath, "
                    "but the MOUNTED_OUTDIR is provided by the scheduler "
                    f"<{proc.xqute.scheduler.__class__.__name__}>. "
                )

            mounted_outdir = Path(sched_mounted_outdir) / proc.name

        elif isinstance(proc.pipeline.outdir, SpecPath):  # pragma: no cover
            # In the case it is modified by a plugin
            # A dual path can not be specified as outdir of a pipeline
            mounted_outdir = proc.pipeline.outdir.mounted / proc.name

        else:
            mounted_outdir = None

        if self.proc.export:
            # Don't put index if it is a single-job process
            self._outdir = SpecPath(export_outdir, mounted=mounted_outdir)

            # Put job output in a subdirectory with index
            # if it is a multi-job process
            if len(self.proc.jobs) > 1:
                self._outdir = self._outdir / str(self.index)

            if sched_mounted_outdir is None:
                # Create the output directory if it is not mounted by the scheduler
                self._outdir.mounted.mkdir(parents=True, exist_ok=True)

        else:
            # For non-export process, the output directory is the metadir
            self._outdir = self.metadir / "output"

        if not proc.script:
            self.cmd = ["true"]
            return

        try:
            script = proc.script.render(self.template_data)
        except Exception as exc:
            raise TemplateRenderingError(
                f"[{self.proc.name}] Failed to render script."
            ) from exc

        if self.script_file.is_file() and self.script_file.read_text() != script:
            self.log("debug", "Job script updated.")
            self.script_file.write_text(script)
        elif not self.script_file.is_file():
            self.script_file.write_text(script)

        lang = proc.lang or proc.pipeline.config.lang
        self.cmd = shlex.split(lang) + [self.script_file.mounted.fspath]

    @propertyDOCS
    def script_file(self) -> SpecPath:
        """Get the path to script file

        Returns:
            The path to the script file
        """
        return self.metadir / "job.script"

    @cached_property
    def outdir(self) -> SpecPath:
        """Get the path to the output directory.

        When proc.export is True, the output directory is based on the
        pipeline.outdir and the process name. Otherwise, it is based on
        the metadir.

        When the job is running in a detached system (a VM, typically),
        this will return the mounted path to the output directory.

        To access the real path, use self._outdir

        Returns:
            The path to the job output directory
        """
        # if ret is a dead link
        # when switching a proc from end/nonend to nonend/end
        # if path_is_symlink(self._outdir) and not self._outdir.exists():
        if path_is_symlink(self._outdir) and (
            # A local deak link
            not self._outdir.exists()
            # A cloud fake link
            or isinstance(getattr(self._outdir, "path", self._outdir), CloudPath)
        ):
            self._outdir.unlink()  # pragma: no cover

        self._outdir.mkdir(parents=True, exist_ok=True)
        # If it is somewhere else, make a symbolic link to the metadir
        metaout = self.metadir / "output"
        if self._outdir != metaout:
            if path_is_symlink(metaout) or metaout.is_file():
                metaout.unlink()
            elif metaout.is_dir():
                metaout.rmtree()

            path_symlink_to(metaout, self._outdir)

        return self._outdir

    @cached_property
    def input(self) -> Mapping[str, Any]:
        """Get the input data for this job

        Returns:
            A key-value map, where keys are the input keys
        """
        import pandas

        ret = self.proc.input.data.iloc[self.index, :].to_dict()
        # check types
        for inkey, intype in self.proc.input.type.items():

            if intype == ProcInputType.VAR or ret[inkey] is None:
                continue  # pragma: no cover, covered actually

            if intype in (ProcInputType.FILE, ProcInputType.DIR):
                ret[inkey] = _process_input_file_or_dir(
                    inkey, intype, ret[inkey], None, self.proc.name
                )

            if intype in (ProcInputType.FILES, ProcInputType.DIRS):
                if isinstance(ret[inkey], pandas.DataFrame):  # pragma: no cover
                    # // todo: nested dataframe
                    ret[inkey] = ret[inkey].iloc[0, 0]

                if isinstance(ret[inkey], (str, PathLike, Path, CloudPath)):
                    # if a single file, convert to list
                    ret[inkey] = [ret[inkey]]

                if not isinstance(ret[inkey], Iterable):
                    raise ProcInputTypeError(
                        f"[{self.proc.name}] Expected an iterable for input: "
                        f"{inkey + ':' + intype!r}, got {type(ret[inkey])}"
                    )

                for i, file in enumerate(ret[inkey]):
                    ret[inkey][i] = _process_input_file_or_dir(
                        inkey, intype, file, i, self.proc.name
                    )

        return ret

    @cached_property
    def output(self) -> Mapping[str, Any]:
        """Get the output data of the job

        Returns:
            The key-value map where the keys are the output keys
        """
        output_template = self.proc.output
        if not output_template:
            return {}

        data = {
            "job": dict(
                index=self.index,
                metadir=self.metadir.mounted,
                outdir=self.outdir.mounted,
                stdout_file=self.stdout_file.mounted,
                stderr_file=self.stderr_file.mounted,
                jid_file=self.jid_file.mounted,
            ),
            "in": self.input,
            "in_": self.input,
            "proc": self.proc,
            "envs": self.proc.envs,
        }
        try:
            if isinstance(output_template, Template):
                # // TODO: check ',' in output value?
                outputs = strsplit(output_template.render(data), ",")
            else:
                outputs = [oput.render(data) for oput in output_template]
        except Exception as exc:
            raise TemplateRenderingError(
                f"[{self.proc.name}] Failed to render output."
            ) from exc

        ret = OrderedDiot()
        for oput in outputs:
            if ":" not in oput:
                raise ProcOutputNameError(
                    f"[{self.proc.name}] No name given in output."
                )

            if oput.count(":") == 1:
                output_name, output_value = oput.split(":")
                output_type = ProcOutputType.VAR
            else:
                output_name, output_type, output_value = oput.split(":", 2)
                if output_type not in ProcOutputType.__dict__.values():
                    raise ProcOutputTypeError(
                        f"[{self.proc.name}] " f"Unsupported output type: {output_type}"
                    )

            self._output_types[output_name] = output_type

            if output_type == ProcOutputType.VAR:
                ret[output_name] = output_value
            else:
                ov = AnyPath(output_value)
                if isinstance(ov, CloudPath) or (
                    isinstance(ov, Path) and ov.is_absolute()
                ):
                    raise ProcOutputValueError(
                        f"[{self.proc.name}] "
                        f"output path must be a segment: {output_value}"
                    )

                out = self.outdir / output_value
                if output_type == ProcOutputType.DIR:
                    out.mkdir(parents=True, exist_ok=True)

                ret[output_name] = out.mounted

        return ret

    @cached_property
    def template_data(self) -> Mapping[str, Any]:
        """Get the data for template rendering

        Returns:
            The data for template rendering
        """
        return {
            "job": dict(
                index=self.index,
                metadir=self.metadir.mounted,
                outdir=self.outdir.mounted,
                stdout_file=self.stdout_file.mounted,
                stderr_file=self.stderr_file.mounted,
                jid_file=self.jid_file.mounted,
            ),
            "in": self.input,
            "in_": self.input,
            "out": self.output,
            "proc": self.proc,
            "envs": self.proc.envs,
        }

    def log(DOCS
        self,
        level: int | str,
        msg: str,
        *args,
        limit: int = 3,
        limit_indicator: bool = True,
        logger: logging.LoggerAdapter = logger,
    ) -> None:
        """Log message for the jobs

        Args:
            level: The log level of the record
            msg: The message to log
            *args: The arguments to format the message
            limit: limitation of the log (don't log for all jobs)
            limit_indicator: Whether to show an indicator saying the log
                has been limited (the level of the indicator will be DEBUG)
            logger: The logger used to log
        """
        if self.index > limit:
            return

        if self.index == limit:
            if limit_indicator:
                msg = f"{msg} (not showing similar logs)"

        if self.proc.size == 1:
            job_index_indicator = ""
        else:
            job_index_indicator = "[%s/%s] " % (
                str(self.index).zfill(len(str(self.proc.size - 1))),
                self.proc.size - 1,
            )

        self.proc.log(level, job_index_indicator + msg, *args, logger=logger)