"""Pipseeker processes"""
from __future__ import annotations
from typing import Type
from diot import Diot # type: ignore
from pipen.utils import is_loading_pipeline
from pipen_args.procgroup import ProcGroup
from ..core.proc import Proc
from ..core.config import config
class PipseekerFull(Proc):DOCS
"""Run pipseeker full command
Tested with pipseeker v3.3.0
Input:
fastqs: The input fastq file
Output:
outdir: The output directory
Envs:
ncores: Number of cores to use
Will be passed to pipseeker with `--threads`.
pipseeker: Path to pipseeker executable
ref: Path of folder containing STAR-compatible transcriptome reference
Will be passed to pipseeker with `--star-index-path`.
verbosity: The verbosity level of pipseeker.
remove_bam (flag): Whether to remove the BAM file generated by pipseeker.
skip_version_check (flag): Whether to skip newer version check of pipseeker.
chemistry: Version of the PIPseq assay (v3, v4, or V).
tmpdir: Path to temporary directory, used to save the soft-lined fastq files
to pass to cellranger.
<more>: Other arguments passed to pipseeker full command.
See <https://www.fluentbio.com/wp-content/uploads/2024/06/PIPseeker-v3.3-User-Guide.pdf> for more details.
""" # noqa: E501
input = "fastqs:files, id"
output = """outdir:dir:
{%- set fastqs = in.fastqs -%}
{%- if len(fastqs) == 1 and isdir(fastqs[0]) -%}
{%- set fastqs = fastqs[0] | glob: "*.fastq.gz" -%}
{%- endif -%}
{%- if in.id -%}
{{in.id}}
{%- else -%}
{%- set id = commonprefix(*fastqs) |
regex_replace: "_R(:?_.*)?$", "" |
regex_replace: "_S(:?_.*)?$", "" |
regex_replace: "_L(:?_.*)?$", "" |
regex_replace: "_R\\d+(:?_.*)?$", "" |
regex_replace: "_L\\d+(:?_.*)?$", "" |
regex_replace: "_S\\d+$", "" -%}
{{- id -}}
{%- endif -%}
"""
output_flatten = True
lang = config.lang.python
envs = {
"ncores": config.misc.ncores,
"pipseeker": config.exe.pipseeker,
"ref": config.ref.ref_pipseeker,
"verbosity": 2,
"remove_bam": True,
"skip_version_check": False,
"chemistry": "v4",
"tmpdir": config.path.tmpdir,
}
script = "file://../scripts/pipseeker/PipseekerFull.py"
plugin_opts = {
"report": "file://../reports/pipseeker/PipseekerFull.svelte",
"report_paging": 5,
}
class PipseekerSummary(Proc):DOCS
"""Summarize the output of pipseeker full command
Input:
indirs: The input directories containing the output of pipseeker full command.
Output:
outdir: The summarized output directory
Envs:
sensitivity (type=list): A list of level3 of sensitivity to use for
summarization.
`pipseeker` usually outputs 5 levels of sensitivity. Choose one of them
to summarize the results.
group (type=auto): The group of the samples for boxplots.
If `None`, don't do boxplots.
It can be a dict of group names and sample names, e.g.
`{"group1": ["sample1", "sample2"], "group2": ["sample3"]}`
or a file containing the group information, with the first column
being the sample names and the second column being the group names.
The file should be tab-delimited with no header.
"""
input = "indirs:dirs"
input_data = lambda ch: [list(ch.iloc[:, 0])]
output = "outdir:dir:{{in.indirs | first | stem | append: '-etc.summary'}}"
lang = config.lang.rscript
script = "file://../scripts/pipseeker/PipseekerSummary.R"
envs = {"sensitivity": [3], "group": None}
plugin_opts = {
"report": "file://../reports/common.svelte",
"report_paging": 8,
}
class PipseekerPipeline(ProcGroup):DOCS
"""The pipseeker pipeline
Run pipseeker full for multiple samples and summarize the metrics.
Args:
input (list): The list of lists of fastq files.
or the list of comma-separated string of fastq files.
ids (list): The list of ids for the samples.
"""
DEFAULTS = Diot(input=None, ids=None)
def post_init(self):DOCS
"""Check if the input is a list of fastq files"""
if not is_loading_pipeline("-h", "-h+", "--help", "--help+") and (
not isinstance(self.opts.input, (list, tuple))
or len(self.opts.input) == 0
):
raise TypeError(
"The input of `PipseekerPipeline` should be a list of lists of "
"fastq files."
)
if isinstance(self.opts.input, (list, tuple)):
self.opts.input = [
[y.strip() for y in x.split(",")]
if isinstance(x, str)
else x
for x in self.opts.input
]
@ProcGroup.add_proc # type: ignore
def p_pipseeker_full(self) -> Type[Proc]:
"""Build PipseekerFull process"""
class _PipseekerFull(PipseekerFull):
name = "PipseekerFull"
if self.opts.ids:
input_data = list(zip(self.opts.input, self.opts.ids))
else:
input_data = self.opts.input
return _PipseekerFull
@ProcGroup.add_proc # type: ignore
def p_pipseeker_summary(self) -> Type[Proc]:
"""Build PipseekerSummary process"""
class _PipseekerSummary(PipseekerSummary):
name = "PipseekerSummary"
requires = self.p_pipseeker_full
input_data = lambda ch: [list(ch.iloc[:, 0])]
return _PipseekerSummary