SOURCE CODE pipen_report.report_manager DOCS

from __future__ import annotations

import inspect
import json
import re
import shutil
import sys
import asyncio
import textwrap
import traceback
import functools
from contextlib import suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Mapping, MutableMapping, Type

from liquid import Liquid
from copier import run_copy
from panpath import CloudPath, PanPath
from xqute.path import SpecCloudPath, MountedPath
from pipen import Proc, ProcGroup
from pipen.defaults import ProcInputType, ProcOutputType
from pipen.exceptions import TemplateRenderingError
from pipen.template import TemplateLiquid, TemplateJinja2
from pipen.utils import get_base, desc_from_docstring, get_marked

from .filters import FILTERS
from .preprocess import preprocess
from .utils import (
    UnifiedLogger,
    get_config,
    logger,
    get_fspath,
    get_cloudpath,
    a_copy_all,
)
from .versions import version_str

if TYPE_CHECKING:
    from pipen import Pipen
    from pipen.job import Job
    from pipen.template import Template

ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")


class NPMBuildingError(Exception):DOCS
    """Error when npm run build failed"""


class ReportManager:DOCS

    def __init__(
        self,
        plugin_opts: Mapping[str, Any],
        outdir: Path | CloudPath,
        workdir: Path | CloudPath,
        cachedir_for_cloud: str,
    ) -> None:
        """Initialize the report manager"""
        outdir = outdir / "REPORTS"

        # Make sure outdir and workdir are local paths
        if isinstance(outdir, SpecCloudPath):
            # modified by plugins like pipen-gcs
            self.outdir = MountedPath(
                get_fspath(outdir, cachedir_for_cloud), spec=outdir
            )
        elif isinstance(outdir, CloudPath):
            self.outdir = MountedPath(
                get_fspath(outdir, cachedir_for_cloud), spec=outdir
            )
        else:
            self.outdir = MountedPath(outdir)

        workdir = workdir / ".report-workdir"
        if isinstance(workdir, CloudPath):
            self.workdir = MountedPath(
                get_fspath(workdir, cachedir_for_cloud), spec=workdir
            )
        else:
            self.workdir = MountedPath(workdir)

        self.npm = get_config("npm", plugin_opts.get("report_npm"))
        self.nmdir = PanPath(get_config("nmdir", plugin_opts.get("report_nmdir")))
        self.extlibs = get_config("extlibs", plugin_opts.get("report_extlibs"))
        self.nobuild = get_config("nobuild", plugin_opts.get("report_nobuild"))
        self.no_collapse_pgs = plugin_opts.get("report_no_collapse_pgs") or []
        self.cachedir_for_cloud = cachedir_for_cloud
        self.has_reports = False
        # Used to pass to the UI for rendering
        self.pipeline_data = None

        if isinstance(self.no_collapse_pgs, str):  # pragma: no cover
            self.no_collapse_pgs = [self.no_collapse_pgs]

    async def check_npm_and_setup_dirs(self) -> None:DOCS
        """Check if npm is available"""

        logger.info("Checking npm and frontend dependencies ...")

        npm = shutil.which(self.npm)
        if npm is None:  # pragma: no cover
            logger.error(
                "Cannot find npm. Please install it or specify the path to npm by:"
            )
            logger.error("$ pipen report config [--local] --npm <path/to/npm>")
            sys.exit(1)

        if not await self.nmdir.a_is_dir():  # pragma: no cover
            logger.error("Invalid nmdir: %s", self.nmdir)
            logger.error("Run `pipen report config [--local] --nmdir ...` to set it")
            sys.exit(1)

        # check if frontend dependencies are installed
        if not await (self.nmdir / "node_modules").a_is_dir():  # pragma: no cover
            logger.error("Frontend dependencies are not installed")
            logger.error("Run `pipen report update` to install them")
            sys.exit(1)

        await self.workdir.a_mkdir(parents=True, exist_ok=True)

        pubdir = self.workdir / "public"
        if await pubdir.a_is_symlink():
            await pubdir.a_unlink()

        nmdir = self.workdir / "node_modules"
        if await nmdir.a_is_symlink():
            await nmdir.a_unlink()

        exdir = self.workdir / "src" / "extlibs"
        with suppress(Exception):
            await exdir.a_rmtree()
        with suppress(Exception):
            await exdir.a_mkdir(parents=True, exist_ok=True)

        # Check if self.workdir is writable
        try:
            testfile = self.workdir / ".writetest"
            await testfile.a_write_text("test")
            await testfile.a_unlink()
        except Exception:  # pragma: no cover
            logger.error("The report workdir is not writable:")
            logger.error("  %s", self.workdir)
            traces = traceback.format_exc().splitlines()
            for trace in traces:
                logger.debug(trace)
            sys.exit(1)

        # Copy rollup config file to workdir
        rollup_config = await self.nmdir.joinpath(
            "rollup.config.js.jinja"
        ).a_read_text()
        rollup_config = Liquid(rollup_config, from_file=False).render(
            extlibs=self.extlibs
        )
        await self.workdir.joinpath("rollup.config.js").a_write_text(rollup_config)

        await self.nmdir.joinpath("public").a_copytree(self.outdir)
        await self.nmdir.joinpath("src").a_copytree(self.workdir / "src")
        await self.nmdir.joinpath("package.json").a_copy(self.workdir / "package.json")

        node_lockfile = self.nmdir.joinpath("package-lock.json")
        bun_lockfile = self.nmdir.joinpath("bun.lock")
        if not await bun_lockfile.a_exists() and not await node_lockfile.a_exists():
            logger.error("Frontend package lock file not found.")
            logger.error("Run `pipen report install` to create it.")
            sys.exit(1)

        if await bun_lockfile.a_exists():
            await bun_lockfile.a_copy(self.workdir / "bun.lock")
        else:
            await node_lockfile.a_copy(self.workdir / "package-lock.json")

        await pubdir.a_symlink_to(self.outdir)
        await nmdir.a_symlink_to(self.nmdir / "node_modules")

        if self.extlibs:
            await exdir.joinpath(Path(self.extlibs).name).a_symlink_to(self.extlibs)

    def _template_opts(self, template_opts) -> Mapping[str, Any]:
        """Template options for renderring
        Only supports liquid and jinja2
        """
        out = template_opts.copy()
        out["filters"] = {**template_opts.get("filters", {}), **FILTERS}
        return out

    def _rendering_data(self, proc: Proc) -> Mapping[str, Any]:
        """Compute the data to render report template

        Args:
            proc: The process

        Returns:
            The data to render report template
        """

        def jobdata(job: Job) -> Mapping[str, Any]:
            """Get data from each job"""

            # Do not use the mounted paths, since we are not building
            # in the job execution environment
            indata = {}
            for inkey, intype in proc.input.type.items():

                if intype == ProcInputType.VAR or job.input[inkey] is None:
                    indata[inkey] = job.input[inkey]
                    continue

                if intype in (ProcInputType.FILE, ProcInputType.DIR):
                    indata[inkey] = job.input[inkey].spec

                if intype in (ProcInputType.FILES, ProcInputType.DIRS):
                    indata[inkey] = [f.spec for f in job.input[inkey]]

            outdata = {}
            for outkey, outtype in job._output_types.items():

                if outtype == ProcOutputType.VAR:
                    outdata[outkey] = job.output[outkey]
                    continue

                outdata[outkey] = job.output[outkey].spec

            data = job.template_data["job"].copy()
            data.update(
                {
                    "in": indata,
                    "in_": indata,
                    "out": outdata,
                }
            )
            return data

        rendering_data = {
            "proc": proc,
            "envs": proc.envs,
            "jobs": [jobdata(job) for job in proc.jobs],
        }
        # first job
        rendering_data["job"] = rendering_data["jobs"][0]
        rendering_data["job0"] = rendering_data["jobs"][0]

        return rendering_data

    async def _npm_run_build(
        self,
        cwd: Path,
        proc: str,
        ulogger: UnifiedLogger,
        force_build: bool,
        cached: bool,
        npages: int = 1,
        procgroup: str | None = None,
    ) -> None:
        """Run a command and log the messages

        proc is ProcGroup:Proc or Proc
        """
        logfile = self.workdir / "pipen-report.log"
        if proc == "_index":
            await logfile.a_write_text("")
            destfile = self.outdir.joinpath("pages", "_index.js")
            ini_datafile = self.workdir / "src" / "init_data.json"
            src_changed = (
                not await ini_datafile.a_exists()
                or not await destfile.a_exists()
                or (await ini_datafile.a_stat()).st_mtime
                > (await destfile.a_stat()).st_mtime
            )
            proc_or_pg = proc
        else:
            proc_or_pg = (
                proc
                if not procgroup
                or self.no_collapse_pgs is True
                or procgroup in self.no_collapse_pgs
                else f"{procgroup}/{proc}"
            )
            srcfile = self.workdir.joinpath("src", "pages", proc, "proc.svelte")
            destfile = self.outdir.joinpath("pages", f"{proc}.js")
            src_changed = (
                not await destfile.a_exists()
                or (await srcfile.a_stat()).st_mtime
                > (await destfile.a_stat()).st_mtime
            )

        if await destfile.a_exists() and not force_build and cached and not src_changed:
            if proc == "_index":
                ulogger.info("Home page cached, skipping report building")
                ulogger.info(f"- workdir: {self.workdir}")
            else:
                ulogger.info(f"{proc_or_pg} cached, skipping report building.")

            return

        ulogger.debug(
            f"Destination exists: {await destfile.a_exists()}; "
            f"force_build: {force_build}; "
            f"cached: {cached}; "
            f"src_changed: {src_changed}"
        )
        if proc_or_pg == "_index":
            ulogger.info("Building home page ...")
            ulogger.info(f"- workdir: {self.workdir}")
        elif npages == 1:
            ulogger.info("Building report ...")
        else:
            ulogger.info(f"Building report ({npages} pages) ...")

        chars_to_error = "(!)"
        errors_to_ignore = {
            # "(!) Unresolved dependencies":
            # "May be ignored if you are using external libraries",
        }
        errored = False

        async with logfile.a_open("a") as flog:
            await flog.write("\n")
            await flog.write(f"# BUILDING {proc_or_pg} ...\n")
            await flog.write("----------------------------------------\n")

            try:
                p = await asyncio.create_subprocess_exec(
                    self.npm,
                    "run",
                    "build",
                    "--",
                    f"--configProc={proc_or_pg}",
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.STDOUT,
                    cwd=str(cwd),
                )
                async for line in p.stdout:
                    line = line.decode()
                    logline = ansi_escape.sub("", line).rstrip()
                    # src/pages/_index/index.js → public/index/index.js
                    await flog.write(ansi_escape.sub("", line))
                    if " → " in logline and logline.startswith("src/pages/"):
                        ulogger.info(f"- {logline.split(' → ')[0]}")

                    if logline.startswith(chars_to_error):  # pragma: no cover
                        if logline in errors_to_ignore:
                            ulogger.warning(
                                f"  {logline} ({errors_to_ignore[logline]})"
                            )
                        else:
                            ulogger.error(f"  {logline}")
                            errored = True

                    if errored:  # pragma: no cover
                        # Early stop
                        await p.terminate()
                        await p.kill()
                        raise NPMBuildingError

                if await p.wait() != 0:  # pragma: no cover
                    raise NPMBuildingError

            except Exception as e:  # pragma: no cover
                with suppress(FileNotFoundError):
                    await destfile.a_unlink()

                if not isinstance(e, NPMBuildingError):
                    await flog.write(str(e))
                    for line in str(e).splitlines():
                        ulogger.error(f"  {line.rstrip()}")

                ulogger.error(f"(!) Failed. See: {logfile}")
                sys.exit(1)

    async def init_pipeline_data(self, pipen: Pipen) -> None:DOCS
        """Write data to workdir"""
        self.pipeline_data = {
            "pipeline": {
                "name": pipen.name,
                "desc": pipen.desc,
            },
            "versions": version_str,
            "entries": [
                # Either a proc or a procgroup
            ],
        }

        procgroups = {}
        for i, proc in enumerate(pipen.procs):
            if not (getattr(proc, "plugin_opts") or {}).get("report", False):
                continue

            entry = {
                "name": proc.name,
                "desc": proc.desc or desc_from_docstring(proc, Proc),
                "npages": 1,
                "report_toc": True,
                "order": ((proc.plugin_opts or {}).get("report_order", 0) * 1000 + i),
            }

            pg = proc.__meta__["procgroup"]
            if self.no_collapse_pgs is True or (
                pg and pg.name in self.no_collapse_pgs
            ):  # pragma: no cover
                pg = None

            if pg and pg.name not in procgroups:
                procgroups[pg.name] = {
                    "name": pg.name,
                    "desc": desc_from_docstring(pg.__class__, ProcGroup),
                    "order": entry["order"],
                    "procs": [entry],
                }
                self.pipeline_data["entries"].append(procgroups[pg.name])
            elif pg:
                procgroups[pg.name]["order"] = min(
                    procgroups[pg.name]["order"], entry["order"]
                )
                procgroups[pg.name]["procs"].append(entry)
            else:
                self.pipeline_data["entries"].append(entry)

        self.pipeline_data["entries"].sort(key=lambda x: x["order"])

        # Write the initial data to check if home page is cached
        datafile = self.workdir / "src" / "init_data.json"
        if (
            not await datafile.a_exists()
            or json.loads(await datafile.a_read_text()) != self.pipeline_data
        ):
            await datafile.a_write_text(json.dumps(self.pipeline_data, indent=2))

    async def _update_proc_meta(self, proc: Proc, npages: int) -> None:
        """Update the number of pages for a process"""

        runinfo_sess_file = proc.workdir / "0" / "job.runinfo.session"
        runinfo_time_file = proc.workdir / "0" / "job.runinfo.time"
        runinfo_dev_file = proc.workdir / "0" / "job.runinfo.device"

        runinfo_sess = (
            await runinfo_sess_file.a_read_text()
            if await runinfo_sess_file.a_exists()
            else (
                "pipen-runinfo plugin not enabled or language not supported "
                "for saving session information."
            )
        )
        runinfo_time = (
            textwrap.dedent(await runinfo_time_file.a_read_text())
            if await runinfo_time_file.a_exists()
            else "pipen-runinfo plugin not enabled."
        )
        runinfo_dev = (
            await runinfo_dev_file.a_read_text()
            if await runinfo_dev_file.a_exists()
            else "pipen-runinfo plugin not enabled."
        )
        to_update = {
            "npages": npages,
            "desc": proc.desc,
            "report_toc": proc.plugin_opts.get("report_toc", True),
            "runinfo": {
                "session": runinfo_sess,
                "time": runinfo_time,
                "device": runinfo_dev,
            },
        }

        pg = proc.__meta__["procgroup"]
        if self.no_collapse_pgs is True or (
            pg and pg.name in self.no_collapse_pgs
        ):  # pragma: no cover
            pg = None

        for entry in self.pipeline_data["entries"]:
            if pg and entry["name"] == pg.name:
                for p in entry["procs"]:
                    if p["name"] == proc.name:
                        p.update(to_update)
                        break
                break
            elif entry["name"] == proc.name:
                entry.update(to_update)
                break

    async def _render_file(
        self,
        engine: Type[Template],
        engine_opts: MutableMapping[str, Any],
        source: str,
        render_data: Mapping[str, Any],
    ) -> str:
        """Render a template file"""
        if engine in (TemplateLiquid, TemplateJinja2):
            # Avoid {#if ... } being treated as jinja comments
            engine_opts["comment_start_string"] = "{!"
            engine_opts["comment_end_string"] = "!}"

        eng = engine(source, **engine_opts)
        # A better way to handle missing includes/imports
        # if the included file is in the cloud path, download it first
        missed_files = set()
        while True:
            try:
                return eng.render(render_data)
            except FileNotFoundError as e:
                missed_file = str(e).split(": '")[1][:-1]
                if missed_file in missed_files:  # pragma: no cover
                    raise e

                missed_files.add(missed_file)
                cloud_file = get_cloudpath(missed_file, self.cachedir_for_cloud)
                if cloud_file is not None:
                    ppath = PanPath(cloud_file)
                    lpath = PanPath(missed_file)
                    await a_copy_all(ppath, lpath, self.cachedir_for_cloud)
                else:
                    raise e

    async def render_proc_report(self, proc: Proc):DOCS
        """Render the report template for a process

        Args:
            proc: The process
            status: The status of the process
        """
        rendering_data = self._rendering_data(proc)

        # Render the report
        # in case it's a Path object
        report = str(proc.plugin_opts["report"])
        report_toc = proc.plugin_opts.get("report_toc", True)
        report_paging = proc.plugin_opts.get("report_paging", False)
        report_relpath_tags = proc.plugin_opts.get("report_relpath_tags", None) or {}
        if report.startswith("file://"):
            report_tpl = PanPath(report[7:])
            if not report_tpl.is_absolute():
                base = get_base(
                    proc.__class__,
                    Proc,
                    report,
                    lambda klass: (
                        None
                        if klass.plugin_opts is None
                        else str(klass.plugin_opts.get("report", None))
                    ),
                )
                report_tpl = PanPath(inspect.getfile(base)).parent / report_tpl
            report = await report_tpl.a_read_text()

        template_opts = self._template_opts(proc.template_opts)

        try:
            rendered = await self._render_file(
                proc.template,
                template_opts,  # type: ignore[arg-type]
                report,
                rendering_data,
            )
        except Exception as exc:  # pragma: no cover
            raise TemplateRenderingError(
                f"[{proc.name}] Failed to render report file."
            ) from exc

        # How the pipeline/proc is run
        # If mounted_outdir is not None, it means the pipeline is run remotely
        # The spec paths are paths that mounted inside the remote environment
        # They may not be working on this local system
        run_meta = {
            "outdir": self.outdir,
            "workdir": self.workdir,
            "mounted_outdir": getattr(proc.xqute.scheduler, "MOUNTED_OUTDIR", None),
            "mounted_workdir": getattr(proc.xqute.scheduler, "MOUNTED_METADIR", None),
        }
        if run_meta["mounted_outdir"]:
            run_meta["mounted_outdir"] = MountedPath(
                run_meta["mounted_outdir"],
                spec=proc.pipeline.workdir,
            )
        if run_meta["mounted_workdir"]:
            run_meta["mounted_workdir"] = MountedPath(
                run_meta["mounted_workdir"],
                spec=proc.pipeline.workdir,
            )

        # preprocess the rendered report and get the toc
        rendered_parts, toc = await preprocess(
            rendered,
            run_meta,
            report_toc,
            report_paging,
            report_relpath_tags,
            cachedir_for_cloud=self.cachedir_for_cloud,
            logfn=lambda *args, **kwargs: proc.log(*args, **kwargs, logger=logger),
        )

        if len(toc) > 10 and not report_paging:  # pragma: no cover
            proc.log(
                "warning",
                "There are > 10 sections in the report, "
                "enable paging (`report_paging`) ?",
                logger=logger,
            )

        npages = len(rendered_parts)
        # Update npages in data.json
        await self._update_proc_meta(proc, npages)

        for i, rendered_part in enumerate(rendered_parts):
            await self._render_page(
                rendered=rendered_part,
                name=proc.name,
                page=i,
                toc=toc,
            )

        return npages

    async def _render_page(
        self,
        rendered: str,
        name: str,
        page: int,
        toc: List[Mapping[str, Any]] | None,
    ) -> Path:
        """Render a page of the report"""
        tpl_dir = self.nmdir.joinpath("src", "pages", "proc")
        if page == 0:
            dest_dir = self.workdir.joinpath("src", "pages", name)
        else:
            dest_dir = self.workdir.joinpath("src", "pages", f"{name}-{page}")

        run_copy_partial = functools.partial(
            run_copy,
            str(tpl_dir),
            dest_dir,
            overwrite=True,
            quiet=True,
            data={"name": name, "page": page},
            skip_if_exists=["proc.svelte"],
        )
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, run_copy_partial)
        rendered_report = dest_dir / "proc.svelte"

        await dest_dir.joinpath("toc.json").a_write_text(json.dumps(toc, indent=2))

        if (
            not await rendered_report.a_exists()
            or await rendered_report.a_read_text() != rendered
        ):
            await rendered_report.a_write_text(rendered)

        return rendered_report

    async def build(DOCS
        self,
        proc: Proc | str,
        nobuild: bool,
        force_build: bool,
        cached: bool = False,
    ) -> None:
        """Build report for a process

        Args:
            proc: The process
            nobuild: Don't build the report
            cached: Whether the process is cached
        """
        ulogger = UnifiedLogger(logger, proc)

        if proc == "_index":
            if nobuild:  # pragma: no cover
                ulogger.debug("`report_nobuild` is True, skipping building home page.")
            else:
                await self._npm_run_build(
                    cwd=self.workdir,
                    proc="_index",
                    ulogger=ulogger,
                    force_build=force_build,
                    cached=cached,
                )

            return

        npages = await self.render_proc_report(proc)

        datafile = self.workdir / "src" / "data.json"
        await datafile.a_write_text(json.dumps(self.pipeline_data, indent=2))

        if nobuild or self.nobuild:  # pragma: no cover
            ulogger.debug("`report_nobuild` is True, skipping building report.")
            return

        procgroup = get_marked(proc, "procgroup")
        await self._npm_run_build(
            cwd=self.workdir,
            proc=proc.name,
            ulogger=ulogger,
            force_build=force_build,
            cached=cached,
            npages=npages,
            procgroup=procgroup.name if procgroup else None,
        )

    async def sync_reports(self, logfn: Callable | None = None) -> None:DOCS
        """Sync the reports to the cloud output directory if needed"""

        if hasattr(self.outdir, "spec") and isinstance(self.outdir.spec, CloudPath):
            if logfn:
                logfn("info", "Syncing reports to cloud ...", logger=logger)
            else:
                logger.info("Syncing reports to cloud ...")
                logger.info(f" {self.outdir}")
                logger.info(f" → {self.outdir.spec}")

            await self.outdir.a_copytree(self.outdir.spec)