"""Provide utilities for testing."""
import tempfile
from functools import wraps
from pathlib import Path
from pipen import Pipen
TESTING_INDEX_INIT = 1
TESTING_PARENT_DIR = Path(tempfile.gettempdir())
TESTING_DIR = str(TESTING_PARENT_DIR.joinpath("biopipen-tests-%(index)s"))
RSCRIPT_DIR = TESTING_PARENT_DIR.joinpath("biopipen-tests-rscripts")
RSCRIPT_DIR.mkdir(exist_ok=True)
def _find_testing_index(new):
"""Find the next available testing index"""
index = TESTING_INDEX_INIT
while True:
dir = TESTING_DIR % {"index": index}
if not Path(dir).exists():
if new:
break
else:
return max(index - 1, TESTING_INDEX_INIT)
index += 1
return index
def _get_test_dirs(testfile, new):
"""Get the workdir and outdir for a test pipeline"""
index = _find_testing_index(new)
workdir = TESTING_DIR % {"index": index}
procname = Path(testfile).parent.stem
nsname = Path(testfile).parent.parent.stem
name = f"{nsname}.{procname}"
outdir = f"{workdir}/{nsname}/output"
workdir = f"{workdir}/{procname}/pipen"
Path(workdir).mkdir(parents=True, exist_ok=True)
Path(outdir).mkdir(parents=True, exist_ok=True)
return name, workdir, outdir
def get_pipeline(testfile, loglevel="debug", enable_report=False, **kwargs):DOCS
"""Get a pipeline for a test file"""
name, workdir, outdir = _get_test_dirs(testfile, False)
report_plugin_prefix = "+" if enable_report else "-"
kws = {
"name": name,
"workdir": workdir,
"outdir": outdir,
"loglevel": loglevel,
"plugins": [f"{report_plugin_prefix}report"],
}
kws.update(kwargs)
return Pipen(**kws)
def _run_rcode(rcode: str) -> str:
"""Run R code and return the output"""
import hashlib
import textwrap
import subprocess as sp
# Use sha256 of rcode to name the file
rcode_hash = hashlib.sha256(rcode.encode()).hexdigest()
script_file = RSCRIPT_DIR.joinpath(f"rcode-{rcode_hash}.R")
script_file.write_text(rcode)
p = sp.Popen(["Rscript", str(script_file)], stdout=sp.PIPE, stderr=sp.PIPE)
out, err = p.communicate()
if p.returncode != 0:
out = (
f"R codefile:\n {script_file}\n"
f"Error:\n{textwrap.indent(err.decode(), ' ')}"
)
return out
return out.decode().strip()
def r_test(mem: callable) -> callable:DOCS
"""A decorator to test R code"""
@wraps(mem)
def decorator(self, *args, **kwargs):
rcode = mem(self, *args, **kwargs)
source = getattr(self, "SOURCE_FILE", None)
expect = (
"expect <- function(expr, ...) {\n"
" if (!expr) {\n"
" msg <- lapply(\n"
" list(...),\n"
" function(x) { ifelse(is.null(x), 'NULL', x) }\n"
" )\n"
" stop(paste0(unlist(msg), collapse = ' '))\n"
" }\n"
"}\n"
)
rcode = f"{expect}\n\n{rcode}\n\ncat('PASSED')\n"
if source is not None:
if not isinstance(source, (list, tuple)):
source = [source]
libs = "\n".join([f"suppressWarnings(source('{s}'))" for s in source])
rcode = f'{libs}\n\n{rcode}'
out = _run_rcode(rcode)
self.assertEqual(
out,
"PASSED",
"\n-----------------------------\n"
f"{out}"
"\n-----------------------------\n"
)
return decorator