SOURCE CODE pipen_filters.filters DOCS

"""Provides the filters"""
from __future__ import annotations

import json
from glob import glob as pyglob
from os import PathLike, path, readlink as os_readlink
from pathlib import Path
from typing import Any, List, Mapping, Union, Dict, Callable

import rtoml
from diot import Diot
from simpleconf import Config
from simpleconf.caster import cast, null_caster

FILTERS: Dict[str, Callable] = {}


def add_filter(DOCS
    aliases: str | list[str] | Callable | None = None
) -> Callable[[Callable], Callable]:
    """Add a filter to the FILTERS

    Examples:
        Filters added: `myfilter`
        >>> @add_filter
        ... def myfilter(var):
        ...     return var

        >>> @add_filter()
        ... def myfilter(var):
        ...     return var

        Filters added: `myfilter`, `myfilter2`
        >>> @add_filter("myfilter2")
        ... def myfilter(var):
        ...     return var

    Args:
        aliases: The aliases of the filter, the filter itself, or None

    Returns:
        The filter itself if used directly `@add_filter`; or
        The decorator to add the filter if used with arguments `@add_filter(...)`
    """
    if callable(aliases):
        return add_filter()(aliases)

    aliases = aliases or []
    if isinstance(aliases, str):
        aliases = [aliases]

    def _add_filter(func: Callable) -> Callable:
        FILTERS[func.__name__] = func
        for alias in aliases:
            FILTERS[alias] = func
        return func

    return _add_filter


def _neg1_if_error(func: Callable) -> Callable:
    """Return -1 if an error occurs"""
    def _func(*args: Any, **kwargs: Any) -> int:
        try:
            return func(*args, **kwargs)
        except (FileNotFoundError, OSError):
            return -1

    return _func


def _splitexit(
    pth: PathLike,
    ignore: list[str] | str,
    recursive: bool
) -> tuple[str, str]:
    """Split the extension with leading dot of a file

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The path and the extension (with leading dot)
    """
    if isinstance(ignore, str):
        ignore = [ignore]
    ignore = ["." + ext.lstrip(".") for ext in ignore]
    pth, last = path.splitext(pth)
    if not recursive:
        return (pth, last) if last not in ignore else path.splitext(pth)

    while last in ignore:
        pth, last = path.splitext(pth)
    return pth, last


@add_filterDOCS
def realpath(pth: PathLike) -> str:
    """Get the real path of a path

    Args:
        pth: The path to the file

    Returns:
        The real path of the file
    """
    return path.realpath(pth)


@add_filterDOCS
def readlink(pth: PathLike) -> str:
    """Get the link of a symlink

    Args:
        pth: The path to the symlink

    Returns:
        The link of the symlink
    """
    return os_readlink(pth)


@add_filterDOCS
def commonprefix(*paths: PathLike, basename_only: bool = True) -> str:
    """Get the common prefix of a set of paths

    Examples:
        >>> commonprefix("/a/b/abc.txt", "/a/b/abc.png")
        >>> # "abc."
        >>> commonprefix("/a/b/abc.txt", "/a/b/abc.png", basename_only=False)
        >>> # "/a/b/abc."

    Args:
        *paths: The paths to find commonprefix agaist
        basename_only: Only search on the basenames

    Returns:
        The common prefix of the paths
    """
    paths = [path.basename(pth) if basename_only else pth for pth in paths]
    return path.commonprefix(paths)


@add_filterDOCS
def dirname(pth: PathLike) -> str:
    """Get the directory name of a path

    For example, `/a/b/c.txt => /a/b/`

    Args:
        pth: The path to the file

    Returns:
        The directory name of the file
    """
    return path.dirname(pth)


@add_filterDOCS
def basename(pth: PathLike) -> str:
    """Get the basename of a path

    For example, `/a/b/c.txt => c.txt`

    Args:
        pth: The path to the file

    Returns:
        The basename of the file
    """
    return path.basename(pth)


@add_filter("suffix")DOCS
def ext(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the extension of a file

    For example, `/a/b/c.txt => .txt`.

    Aliases: `suffix`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The extension of the file
    """
    return _splitexit(pth, ignore, recursive)[1]


@add_filter("suffix0")DOCS
def ext0(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the extension of a file without the leading dot

    For example, `/a/b/c.txt => txt`.

    Aliases: `suffix0`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The extension of the file without the leading dot
    """
    return ext(pth, ignore, recursive)[1:]


@add_filterDOCS
def prefix(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the prefix of a file

    For example, `/a/b/c.txt => /a/b/c`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The prefix of the file
    """
    return _splitexit(pth, ignore, recursive)[0]


@add_filterDOCS
def prefix0(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the prefix of a file without the extension

    For example, `/a/b/c.d.txt => /a/b/c.d`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The prefix of the file without the extension
    """
    return path.join(path.dirname(pth), FILTERS["filename0"](pth, ignore, recursive))


@add_filter(["fn", "stem"])DOCS
def filename(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the filename of a file.

    For example, `/a/b/c.d.txt => c.d`.

    Aliases: `fn`, `stem`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The filename of the file
    """
    return path.basename(_splitexit(pth, ignore, recursive)[0])


@add_filter(["fn0", "stem0"])DOCS
def filename0(pth: PathLike, ignore: list[str] | str = [], recursive: bool = False) -> str:
    """Get the filename of a file without the extension

    For example, `/a/b/c.d.txt => c`.

    Aliases: `fn0`, `stem0`

    Args:
        pth: The path to the file
        ignore: The extensions to ignore
            The extensions can be with or without leading dot
        recursive: Recursively ignore the extensions from the end

    Returns:
        The filename of the file without the extension
    """
    return filename(pth, ignore, recursive).split(".")[0]


@add_filter("joinpath")DOCS
def joinpaths(*paths: PathLike) -> str:
    """Join paths.

    For example, `joinpaths("a", "b") => "a/b"`.

    Aliases: `joinpath`

    Args:
        *paths: The paths to join

    Returns:
        The joined path
    """
    return path.join(*paths)


@add_filterDOCS
def as_path(pth: PathLike) -> Path:
    """Convert a path to a Path object

    Args:
        pth: The path to convert

    Returns:
        The Path object
    """
    return Path(pth)


@add_filterDOCS
def isdir(pth: PathLike) -> bool:
    """Check if a path is a directory

    Args:
        pth: The path to check

    Returns:
        True if the path is a directory, False otherwise
    """
    return path.isdir(pth)


@add_filterDOCS
def isfile(pth: PathLike) -> bool:
    """Check if a path is a file

    Args:
        pth: The path to check

    Returns:
        True if the path is a file, False otherwise
    """
    return path.isfile(pth)


@add_filterDOCS
def islink(pth: PathLike) -> bool:
    """Check if a path is a symlink

    Args:
        pth: The path to check

    Returns:
        True if the path is a symlink, False otherwise
    """
    return path.islink(pth)


@add_filterDOCS
def exists(pth: PathLike) -> bool:
    """Check if a path exists

    Args:
        pth: The path to check

    Returns:
        True if the path exists, False otherwise
    """
    return path.exists(pth)


@add_filterDOCS
def getsize(pth: PathLike) -> int:
    """Get the size of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The size of the file
    """
    return _neg1_if_error(path.getsize)(pth)


@add_filterDOCS
def getmtime(pth: PathLike) -> int:
    """Get the modification time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The modification time of the file
    """
    return _neg1_if_error(path.getmtime)(pth)


@add_filterDOCS
def getctime(pth: PathLike) -> int:
    """Get the creation time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The creation time of the file
    """
    return _neg1_if_error(path.getctime)(pth)


@add_filterDOCS
def getatime(pth: PathLike) -> int:
    """Get the access time of a file, return -1 if the file does not exist

    Args:
        pth: The path to the file

    Returns:
        The access time of the file
    """
    return _neg1_if_error(path.getatime)(pth)


@add_filterDOCS
def isempty(
    pth: PathLike,
    ignore_ws: bool = True,
    nonfile_as_empty: bool = False,
) -> bool:
    """Check if a file is empty

    Args:
        pth: The path to the file
        ignore_ws: Ignore whitespaces?
        nonfile_as_empty: Treat non-file as empty?

    Returns:
        True if the file is empty, False otherwise
    """
    if not path.isfile(pth):
        return nonfile_as_empty

    if not ignore_ws:
        return path.getsize(pth) == 0

    with open(pth) as fvar:
        return fvar.read().strip() == ""


@add_filterDOCS
def quote(var: Any) -> str:
    """Quote a string

    Args:
        var: The string to quote

    Returns:
        The quoted string
    """
    return json.dumps(str(var))


@add_filterDOCS
def squote(var: Any) -> str:
    """Quote a string with single quotes

    Args:
        var: The string to quote

    Returns:
        The quoted string
    """
    return repr(str(var))


@add_filter("json")DOCS
def json_dumps(var: Any) -> str:
    """Dump an object to json.

    Aliases: `json`

    Args:
        var: The object to dump

    Returns:
        The json string
    """
    return json.dumps(var)


@add_filterDOCS
def json_load(pth: PathLike) -> Any:
    """Load a json file

    Args:
        pth: The path to the json file

    Returns:
        The loaded object
    """
    return config(pth, "json")


@add_filterDOCS
def json_loads(jsonstr: str) -> Any:
    """Load a json string to an object

    Args:
        jsonstr: The json string

    Returns:
        The loaded object
    """
    return json.loads(jsonstr)


@add_filter("toml_dumps")DOCS
def toml(var: Any) -> str:
    """Dump an object to toml.

    Aliases: `toml_dumps`

    Args:
        var: The object to dump

    Returns:
        The toml string
    """
    return rtoml.dumps(var)


@add_filterDOCS
def toml_load(pth: PathLike) -> Any:
    """Load a toml file. `null` will be loaded as None

    Args:
        pth: The path to the toml file

    Returns:
        The loaded object
    """
    return config(pth, "toml")


@add_filterDOCS
def toml_loads(tomlstr: str) -> Any:
    """Load a toml string to an object, `null` will be loaded as None

    Args:
        tomlstr: The toml string

    Returns:
        The loaded object
    """
    return cast(Diot(rtoml.loads(tomlstr)), [null_caster])


@add_filterDOCS
def config(x: Any, loader: str = None) -> Mapping[str, Any]:
    """Get the configuration (python dictionary) from a file

    Args:
        x: The path to the file, dict or string of configurations (json or toml)
        loader: The loader to use, defaults to auto-detect
            If x is a dict, this argument is ignored
            if x is a string and is not a file path, then x will be loaded as
            a toml string if loader is not specified
            if x is a file path, then x will be loaded according to the file
            extension

    Returns:
        The config
    """
    if not isinstance(x, (Path, str)):  # assume dict
        return Config.load_one(x, loader="dict")

    if isinstance(x, str) and not Path(x).is_file():
        if loader == "toml":
            return Diot(FILTERS["toml_loads"](x))
        if loader == "json":
            return Diot(FILTERS["json_loads"](x))
        raise ValueError(f"Unknown loader: {loader}")

    return Config.load_one(x, loader=loader)


@add_filterDOCS
def glob(*paths: PathLike) -> List[str]:
    """Glob a path

    Args:
        *paths: The paths to glob

    Returns:
        The globbed paths
    """
    return list(sorted(pyglob(path.join(*paths))))


@add_filterDOCS
def glob0(*paths: PathLike) -> str:
    """Glob a path and return the first result

    Args:
        *paths: The paths to glob

    Returns:
        The first globbed path
    """
    return glob(*paths)[0]


@add_filterDOCS
def read(file: PathLike, *args: Any, **kwargs: Any) -> Union[str, bytes]:
    """Read the contents from a file

    Args:
        file: The path to the file
        *args: and
        **kwargs: Other arguments passed to `open()`

    Returns:
        The contents of the file
    """
    with open(file, *args, **kwargs) as fvar:
        return fvar.read()


@add_filterDOCS
def readlines(
    file: PathLike,
    *args: Any,
    **kwargs: Any,
) -> Union[List[str], List[bytes]]:
    """Read the lines from a file

    Args:
        file: The path to the file
        *args: and
        **kwargs: Other arguments to `open()`

    Returns:
        A list of lines in the file
    """
    return read(file, *args, **kwargs).splitlines()


@add_filterDOCS
def regex_replace(
    string: str,
    pattern: str,
    repl: str,
    count: int = 0,
    flags: int = 0,
) -> str:
    """Replace the matched pattern with a string

    Args:
        string: The string to search
        pattern: The pattern to search
        repl: The string to replace
        flags: The regex flags

    Returns:
        The replaced string
    """
    import re
    return re.sub(pattern, repl, string, count=count, flags=flags)


@add_filterDOCS
def slugify(string: str, *args: Any, **kwargs: Any) -> str:
    """Slugify a string

    Args:
        string: The string to slugify
        *args: and
        **kwargs: Other arguments to `slugify()`

    Returns:
        The slugified string
    """
    from slugify import slugify as _slugify
    return _slugify(string, *args, **kwargs)