SOURCE CODE biopipen.ns.pipseeker DOCS

"""Pipseeker processes"""
from __future__ import annotations
from typing import Type
from diot import Diot  # type: ignore
from pipen.utils import is_loading_pipeline
from pipen_args.procgroup import ProcGroup

from ..core.proc import Proc
from ..core.config import config


class PipseekerFull(Proc):DOCS
    """Run pipseeker full command

    Tested with pipseeker v3.3.0

    Input:
        fastqs: The input fastq file

    Output:
        outdir: The output directory

    Envs:
        ncores: Number of cores to use
            Will be passed to pipseeker with `--threads`.
        pipseeker: Path to pipseeker executable
        ref: Path of folder containing STAR-compatible transcriptome reference
            Will be passed to pipseeker with `--star-index-path`.
        verbosity: The verbosity level of pipseeker.
        remove_bam (flag): Whether to remove the BAM file generated by pipseeker.
        skip_version_check (flag): Whether to skip newer version check of pipseeker.
        chemistry: Version of the PIPseq assay (v3, v4, or V).
        tmpdir: Path to temporary directory, used to save the soft-lined fastq files
            to pass to cellranger.
        <more>: Other arguments passed to pipseeker full command.
            See <https://www.fluentbio.com/wp-content/uploads/2024/06/PIPseeker-v3.3-User-Guide.pdf> for more details.
    """  # noqa: E501
    input = "fastqs:files, id"
    output = """outdir:dir:
        {%- set fastqs = in.fastqs -%}
        {%- if len(fastqs) == 1 and isdir(fastqs[0]) -%}
            {%- set fastqs = fastqs[0] | glob: "*.fastq.gz" -%}
        {%- endif -%}
        {%- if in.id -%}
            {{in.id}}
        {%- else -%}
            {%- set id = commonprefix(*fastqs) |
                regex_replace: "_R(:?_.*)?$", "" |
                regex_replace: "_S(:?_.*)?$", "" |
                regex_replace: "_L(:?_.*)?$", "" |
                regex_replace: "_R\\d+(:?_.*)?$", "" |
                regex_replace: "_L\\d+(:?_.*)?$", "" |
                regex_replace: "_S\\d+$", "" -%}
            {{- id -}}
        {%- endif -%}
    """
    output_flatten = True
    lang = config.lang.python
    envs = {
        "ncores": config.misc.ncores,
        "pipseeker": config.exe.pipseeker,
        "ref": config.ref.ref_pipseeker,
        "verbosity": 2,
        "remove_bam": True,
        "skip_version_check": False,
        "chemistry": "v4",
        "tmpdir": config.path.tmpdir,
    }
    script = "file://../scripts/pipseeker/PipseekerFull.py"
    plugin_opts = {
        "report": "file://../reports/pipseeker/PipseekerFull.svelte",
        "report_paging": 5,
    }


class PipseekerSummary(Proc):DOCS
    """Summarize the output of pipseeker full command

    Input:
        indirs: The input directories containing the output of pipseeker full command.

    Output:
        outdir: The summarized output directory

    Envs:
        sensitivity (type=list): A list of level3 of sensitivity to use for
            summarization.
            `pipseeker` usually outputs 5 levels of sensitivity. Choose one of them
            to summarize the results.
        group (type=auto): The group of the samples for boxplots.
            If `None`, don't do boxplots.
            It can be a dict of group names and sample names, e.g.
            `{"group1": ["sample1", "sample2"], "group2": ["sample3"]}`
            or a file containing the group information, with the first column
            being the sample names and the second column being the group names.
            The file should be tab-delimited with no header.
    """
    input = "indirs:dirs"
    input_data = lambda ch: [list(ch.iloc[:, 0])]
    output = "outdir:dir:{{in.indirs | first | stem | append: '-etc.summary'}}"
    lang = config.lang.rscript
    script = "file://../scripts/pipseeker/PipseekerSummary.R"
    envs = {"sensitivity": [3], "group": None}
    plugin_opts = {
        "report": "file://../reports/common.svelte",
        "report_paging": 8,
    }


class PipseekerPipeline(ProcGroup):DOCS
    """The pipseeker pipeline

    Run pipseeker full for multiple samples and summarize the metrics.

    Args:
        input (list): The list of lists of fastq files.
            or the list of comma-separated string of fastq files.
        ids (list): The list of ids for the samples.
    """
    DEFAULTS = Diot(input=None, ids=None)

    def post_init(self):DOCS
        """Check if the input is a list of fastq files"""
        if not is_loading_pipeline("-h", "-h+", "--help", "--help+") and (
            not isinstance(self.opts.input, (list, tuple))
            or len(self.opts.input) == 0
        ):
            raise TypeError(
                "The input of `PipseekerPipeline` should be a list of lists of "
                "fastq files."
            )

        if isinstance(self.opts.input, (list, tuple)):
            self.opts.input = [
                [y.strip() for y in x.split(",")]
                if isinstance(x, str)
                else x
                for x in self.opts.input
            ]

    @ProcGroup.add_proc  # type: ignore
    def p_pipseeker_full(self) -> Type[Proc]:
        """Build PipseekerFull process"""

        class _PipseekerFull(PipseekerFull):
            name = "PipseekerFull"

            if self.opts.ids:
                input_data = list(zip(self.opts.input, self.opts.ids))
            else:
                input_data = self.opts.input

        return _PipseekerFull

    @ProcGroup.add_proc  # type: ignore
    def p_pipseeker_summary(self) -> Type[Proc]:
        """Build PipseekerSummary process"""

        class _PipseekerSummary(PipseekerSummary):
            name = "PipseekerSummary"

            requires = self.p_pipseeker_full
            input_data = lambda ch: [list(ch.iloc[:, 0])]

        return _PipseekerSummary