SOURCE CODE pipen_filters.filters DOCS

"""Provides the filters"""

from __future__ import annotations

import json
from functools import wraps
from os import PathLike, path
from pathlib import Path
from typing import Any, List, Mapping, Union, Dict, Callable

from diot import Diot
from simpleconf import Config
from simpleconf.caster import cast, null_caster
from yunpath import AnyPath, CloudPath

FILTERS: Dict[str, Callable] = {}


def add_filter(DOCS
    aliases: str | list[str] | Callable | None = None,
) -> Callable[[Callable], Callable]:
    """Add a filter to the FILTERS

    Examples:
        Filters added: `myfilter`
        >>> @add_filter
        ... def myfilter(var):
        ...     return var

        >>> @add_filter()
        ... def myfilter(var):
        ...     return var

        Filters added: `myfilter`, `myfilter2`
        >>> @add_filter("myfilter2")
        ... def myfilter(var):
        ...     return var

    Args:
        aliases: The aliases of the filter, the filter itself, or None

    Returns:
        The filter itself if used directly `@add_filter`; or
        The decorator to add the filter if used with arguments `@add_filter(...)`
    """
    if callable(aliases):
        return add_filter()(aliases)

    aliases = aliases or []
    if isinstance(aliases, str):
        aliases = [aliases]

    def _add_filter(func: Callable) -> Callable:
        FILTERS[func.__name__] = func
        for alias in aliases:
            FILTERS[alias] = func
        return func

    return _add_filter


def _neg1_if_error(func: Callable) -> Callable:
    """Return -1 if an error occurs"""

    @wraps(func)
    def _func(*args: Any, **kwargs: Any) -> int:
        try:
            return func(*args, **kwargs)
        except Exception:
            return -1

    return _func


def _splitexit(
    pth: str | PathLike, ignore: list[str] | str, recursive: bool
) -> tuple[str, str]:
    """Split the extension with leading dot of a file

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The path and the extension (with leading dot)
    """
    if isinstance(ignore, str):
        ignore = [ignore]

    ignore = ["." + ext.lstrip(".") for ext in ignore]
    pth, last = path.splitext(str(pth))
    if not recursive:
        return (pth, last) if last not in ignore else path.splitext(pth)

    while last in ignore:
        pth, last = path.splitext(pth)
    return pth, last


@add_filterDOCS
def realpath(pth: str | PathLike) -> str:
    """Get the real path of a path

    Args:
        pth: The path to the file

    Returns:
        The real path of the file
    """
    return str(AnyPath(pth).resolve())


@add_filterDOCS
def readlink(pth: str | PathLike) -> str:
    """Get the link of a symlink

    Args:
        pth: The path to the symlink

    Returns:
        The link of the symlink
    """
    return str(AnyPath(pth).readlink())


@add_filterDOCS
def commonprefix(*paths: str | PathLike, basename_only: bool = True) -> str:
    """Get the common prefix of a set of paths

    Examples:
        >>> commonprefix("/a/b/abc.txt", "/a/b/abc.png")
        >>> # "abc."
        >>> commonprefix("/a/b/abc.txt", "/a/b/abc.png", basename_only=False)
        >>> # "/a/b/abc."

    Args:
        *paths: The paths to find commonprefix agaist
        basename_only: Only search on the basenames

    Returns:
        The common prefix of the paths
    """
    paths = [AnyPath(pth).name if basename_only else str(pth) for pth in paths]
    return path.commonprefix(paths)


@add_filterDOCS
def dirname(pth: str | PathLike) -> str:
    """Get the directory name of a path

    For example, `/a/b/c.txt => /a/b/`

    Args:
        pth: The path to the file

    Returns:
        The directory name of the file
    """
    return str(AnyPath(pth).parent)


@add_filterDOCS
def basename(pth: str | PathLike) -> str:
    """Get the basename of a path

    For example, `/a/b/c.txt => c.txt`

    Args:
        pth: The path to the file

    Returns:
        The basename of the file
    """
    return str(AnyPath(pth).name)


@add_filter("suffix")DOCS
def ext(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the extension of a file

    For example, `/a/b/c.txt => .txt`.

    Aliases: `suffix`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The extension of the file
    """
    return _splitexit(pth, ignore, recursive)[1]


@add_filter("suffix0")DOCS
def ext0(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the extension of a file without the leading dot

    For example, `/a/b/c.txt => txt`.

    Aliases: `suffix0`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The extension of the file without the leading dot
    """
    return ext(pth, ignore, recursive)[1:]


@add_filterDOCS
def prefix(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the prefix of a file

    For example, `/a/b/c.txt => /a/b/c`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The prefix of the file
    """
    return _splitexit(pth, ignore, recursive)[0]


@add_filterDOCS
def prefix0(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the prefix of a file without the extension

    For example, `/a/b/c.d.txt => /a/b/c.d`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The prefix of the file without the extension
    """
    return str(AnyPath(pth).parent / FILTERS["filename0"](pth, ignore, recursive))


@add_filter(["fn", "stem"])DOCS
def filename(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the filename of a file.

    For example, `/a/b/c.d.txt => c.d`.

    Aliases: `fn`, `stem`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The filename of the file
    """
    return basename(_splitexit(pth, ignore, recursive)[0])


@add_filter(["fn0", "stem0"])DOCS
def filename0(
    pth: str | PathLike,
    ignore: list[str] | str = [],
    recursive: bool = False,
) -> str:
    """Get the filename of a file without the extension

    For example, `/a/b/c.d.txt => c`.

    Aliases: `fn0`, `stem0`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The filename of the file without the extension
    """
    return filename(pth, ignore, recursive).split(".")[0]


@add_filter("joinpath")DOCS
def joinpaths(pathsegment: str | PathLike, *pathsegments: str | PathLike) -> str:
    """Join paths.

    For example, `joinpaths("a", "b") => "a/b"`.

    Aliases: `joinpath`

    Args:
        pathsegment: The path to join
        *pathsegments: The paths to join

    Returns:
        The joined path
    """
    return str(AnyPath(pathsegment).joinpath(*pathsegments))


@add_filterDOCS
def as_path(pth: str | PathLike) -> Path | CloudPath:
    """Convert a path to a Path object

    Args:
        pth: The path to convert

    Returns:
        The Path object
    """
    return AnyPath(pth)


@add_filterDOCS
def isdir(pth: str | PathLike) -> bool:
    """Check if a path is a directory

    Args:
        pth: The path to check

    Returns:
        True if the path is a directory, False otherwise
    """
    return AnyPath(pth).is_dir()


@add_filterDOCS
def isfile(pth: str | PathLike) -> bool:
    """Check if a path is a file

    Args:
        pth: The path to check

    Returns:
        True if the path is a file, False otherwise
    """
    return AnyPath(pth).is_file()


@add_filterDOCS
def islink(pth: str | PathLike) -> bool:
    """Check if a path is a symlink

    Args:
        pth: The path to check

    Returns:
        True if the path is a symlink, False otherwise
    """
    return AnyPath(pth).is_symlink()


@add_filterDOCS
def exists(pth: str | PathLike) -> bool:
    """Check if a path exists

    Args:
        pth: The path to check

    Returns:
        True if the path exists, False otherwise
    """
    return AnyPath(pth).exists()


@add_filterDOCS
@_neg1_if_error
def getsize(pth: str | PathLike) -> int:
    """Get the size of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The size of the file
    """
    return AnyPath(pth).stat().st_size


@add_filterDOCS
@_neg1_if_error
def getmtime(pth: str | PathLike) -> int:
    """Get the modification time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The modification time of the file
    """
    return AnyPath(pth).stat().st_mtime


@add_filterDOCS
@_neg1_if_error
def getctime(pth: str | PathLike) -> int:
    """Get the creation time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The creation time of the file
    """
    return AnyPath(pth).stat().st_ctime


@add_filterDOCS
@_neg1_if_error
def getatime(pth: str | PathLike) -> int:
    """Get the access time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The access time of the file
    """
    return AnyPath(pth).stat().st_atime


@add_filterDOCS
def isempty(
    pth: str | PathLike,
    ignore_ws: bool = True,
    nonfile_as_empty: bool = False,
) -> bool:
    """Check if a file is empty

    Args:
        pth: The path to the file
        ignore_ws: Ignore whitespaces?
        nonfile_as_empty: Treat non-file as empty?

    Returns:
        True if the file is empty, False otherwise
    """
    pth = AnyPath(pth)
    if not pth.is_file():
        return nonfile_as_empty

    if not ignore_ws:
        return getsize(pth) == 0

    return pth.read_text().strip() == ""


@add_filterDOCS
def quote(var: Any, quote_none: bool = False) -> str:
    """Quote a string

    Args:
        var: The string to quote
        quote_none: Quote None as '"None"'?
            Otherwise, return 'None', without quotes

    Returns:
        The quoted string
    """
    if var is None and not quote_none:
        return 'None'

    return json.dumps(str(var))


@add_filterDOCS
def squote(var: Any, quote_none: bool = False) -> str:
    """Quote a string with single quotes

    Args:
        var: The string to quote
        quote_none: Quote None as "'None'"?
            Otherwise, return 'None', without quotes

    Returns:
        The quoted string
    """
    if var is None and not quote_none:
        return 'None'

    return repr(str(var))


@add_filter("json")DOCS
def json_dumps(var: Any) -> str:
    """Dump an object to json.

    Aliases: `json`

    Args:
        var: The object to dump

    Returns:
        The json string
    """
    return json.dumps(var)


@add_filterDOCS
def json_load(pth: str | PathLike) -> Any:
    """Load a json file

    Args:
        pth: The path to the json file

    Returns:
        The loaded object
    """
    return config(pth, "json")


@add_filterDOCS
def json_loads(jsonstr: str) -> Any:
    """Load a json string to an object

    Args:
        jsonstr: The json string

    Returns:
        The loaded object
    """
    return json.loads(jsonstr)


@add_filter("toml_dumps")DOCS
def toml(var: Any) -> str:
    """Dump an object to toml.

    Aliases: `toml_dumps`

    Args:
        var: The object to dump

    Returns:
        The toml string
    """
    return Diot(var).to_toml()


@add_filterDOCS
def toml_load(pth: str | PathLike) -> Any:
    """Load a toml file. `null` will be loaded as None

    Args:
        pth: The path to the toml file

    Returns:
        The loaded object
    """
    return config(pth, "toml")


@add_filterDOCS
def toml_loads(tomlstr: str) -> Any:
    """Load a toml string to an object, `null` will be loaded as None

    Args:
        tomlstr: The toml string

    Returns:
        The loaded object
    """
    return cast(Config.load(tomlstr, loader="tomls"), [null_caster])


@add_filterDOCS
def config(x: Any, loader: str = None) -> Mapping[str, Any]:
    """Get the configuration (python dictionary) from a file

    Args:
        x: The path to the file, dict or string of configurations (json or toml)
        loader: The loader to use, defaults to auto-detect
            If x is a dict, this argument is ignored
            if x is a string and is not a file path, then x will be loaded as
            a toml string if loader is not specified
            if x is a file path, then x will be loaded according to the file
            extension

    Returns:
        The config
    """
    if not isinstance(x, (Path, str)):  # assume dict
        return Config.load_one(x, loader="dict")

    if isinstance(x, str) and not AnyPath(x).is_file():
        if loader == "toml":
            return Diot(FILTERS["toml_loads"](x))
        if loader == "json":
            return Diot(FILTERS["json_loads"](x))
        raise ValueError(f"Unknown loader: {loader}")

    return Config.load_one(x, loader=loader)


@add_filterDOCS
def glob(pathsegment: str | PathLike, *pathsegments: str | PathLike) -> List[str]:
    """Glob a path

    Args:
        pathsegment: The path to glob
        *pathsegments: The paths to glob

    Returns:
        The globbed paths
    """
    return list(
        sorted([str(p) for p in AnyPath(pathsegment).glob("/".join(pathsegments))])
    )


@add_filterDOCS
def glob0(*paths: str | PathLike) -> str:
    """Glob a path and return the first result

    Args:
        *paths: The paths to glob

    Returns:
        The first globbed path
    """
    return glob(*paths)[0]


@add_filterDOCS
def read(file: str | PathLike, *args: Any, **kwargs: Any) -> Union[str, bytes]:
    """Read the contents from a file

    Args:
        file: The path to the file
        *args: and
        **kwargs: Other arguments passed to `open()`

    Returns:
        The contents of the file
    """
    with AnyPath(file).open(*args, **kwargs) as fvar:
        return fvar.read()


@add_filterDOCS
def readlines(
    file: str | PathLike,
    *args: Any,
    **kwargs: Any,
) -> Union[List[str], List[bytes]]:
    """Read the lines from a file

    Args:
        file: The path to the file
        *args: and
        **kwargs: Other arguments to `open()`

    Returns:
        A list of lines in the file
    """
    return read(file, *args, **kwargs).splitlines()


@add_filterDOCS
def regex_replace(
    string: str,
    pattern: str,
    repl: str,
    count: int = 0,
    flags: int = 0,
) -> str:
    """Replace the matched pattern with a string

    Args:
        string: The string to search
        pattern: The pattern to search
        repl: The string to replace
        flags: The regex flags

    Returns:
        The replaced string
    """
    import re

    return re.sub(pattern, repl, string, count=count, flags=flags)


@add_filterDOCS
def slugify(string: str, *args: Any, **kwargs: Any) -> str:
    """Slugify a string

    Args:
        string: The string to slugify
        *args: and
        **kwargs: Other arguments to `slugify()`

    Returns:
        The slugified string
    """
    from slugify import slugify as _slugify

    return _slugify(string, *args, **kwargs)