SOURCE CODE biopipen.utils.misc DOCS

from __future__ import annotations
from pathlib import Path

import os
import sys
import logging
from typing import List
from biopipen.core.filters import dict_to_cli_args  # noqa: F401

logger = logging.getLogger("biopipen_job")
logger.setLevel(logging.DEBUG)
_handler = logging.StreamHandler(sys.stdout)
# Use same log format as in R
# {sprintf("%-7s", level)} [{format(time, "%Y-%m-%d %H:%M:%S")}] {msg}
# so the logs can be populated by pipen-poplog
_handler.setFormatter(
    logging.Formatter(
        "%(levelname)-7s [%(asctime)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
)
logger.addHandler(_handler)


def exec_code(code, global_vars=None, local_vars=None, return_var=None):
    global_vars = global_vars or {}
    local_vars = local_vars or {}
    exec(code, global_vars, local_vars)

    if return_var is not None:
        return local_vars[return_var]

    return None


def run_command(DOCS
    cmd: str | List[str],
    fg: bool = False,
    wait: bool = True,
    print_command: bool = True,
    print_command_handler: callable = print,
    **kwargs,
):
    """Run a command.

    Args:
        cmd: A string or list of strings representing the command to run.
        fg: Whether to run the command in the foreground.
            Redirects stdout and stderr to the current process.
        wait: Whether to wait for the command to finish.
            The command will be waited for if `fg` is `True`.
        print_command: Whether to print the command before running it.
        print_command_handler: The function to use to print the command.
        kwargs: Keyword arguments to pass to `subprocess.Popen`.

    Returns:
        The `Popen` object, or str when `stdout` is `RETURN` or `return`.
    """
    import shlex
    from subprocess import Popen, PIPE, STDOUT

    if isinstance(cmd, list):
        cmd = [str(c) for c in cmd]

    if print_command:
        print_command_handler("RUNNING COMMAND:")
        if isinstance(cmd, str):
            print_command_handler(f"  {cmd}\n")
        else:
            print_command_handler(f"  {shlex.join(cmd)}\n")
        # flush the output if print_command_handler is print
        if print_command_handler is print:
            sys.stdout.flush()

    if isinstance(cmd, str):
        kwargs["shell"] = True

    if kwargs.get("stdin") is True:
        kwargs["stdin"] = PIPE

    return_stdout = False
    if kwargs.get("stdout") is True:
        kwargs["stdout"] = PIPE
    elif kwargs.get("stdout") in ("RETURN", "return"):
        kwargs["stdout"] = PIPE
        return_stdout = True
    elif isinstance(kwargs.get("stdout"), (str, Path)):
        if isinstance(kwargs["stdout"], str):
            kwargs["stdout"] = Path(kwargs["stdout"])
        kwargs["stdout"] = kwargs["stdout"].open("w")
        kwargs["close_fds"] = True

    if kwargs.get("stderr") is True:
        kwargs["stderr"] = PIPE
    elif kwargs.get("stderr") in ("STDOUT", "stdout"):
        kwargs["stderr"] = STDOUT

    if fg:
        if kwargs.get("stdout") or kwargs.get("stderr"):
            raise ValueError(
                "Cannot redirect stdout or stderr when running in foreground"
            )
        kwargs["stdout"] = sys.stdout
        kwargs["stderr"] = sys.stderr
        kwargs["universal_newlines"] = True

    if "env" in kwargs:
        kwargs["env"] = {**os.environ, **kwargs["env"]}

    try:
        p = Popen(cmd, **kwargs)
    except Exception as e:
        raise RuntimeError(f"Failed to run command: {e}")

    if fg or wait or return_stdout:
        rc = p.wait()
        if rc != 0:
            raise RuntimeError(f"Failed to run command: {cmd}")

        if return_stdout:
            return p.stdout.read().decode()

        return p

    return p