Skip to content

SOURCE CODE datar.core.plugin DOCS

"""Plugin system to support different backends"""
from typing import Any, List, Mapping, Tuple, Callable

from simplug import Simplug, SimplugResult, makecall

plugin = Simplug("datar")


def _collect(calls: List[Tuple[Callable, Tuple, Mapping]]) -> Mapping[str, Any]:
    """Collect the results from plugins"""
    collected = {}
    for call in calls:
        out = makecall(call)
        if out is not None:
            collected.update(out)
    return collected


@plugin.specDOCS
def setup():
    """Initialize the backend"""


@plugin.spec(result=_collect)DOCS
def get_versions():
    """Return the versions of the dependencies of the plugin."""


@plugin.spec(result=SimplugResult.TRY_SINGLE)DOCS
def load_dataset(name: str, metadata: Mapping):
    """Implementations for load_dataset()"""


@plugin.spec(result=_collect)DOCS
def base_api():
    """What is implemented the base APIs."""


@plugin.spec(result=_collect)DOCS
def dplyr_api():
    """What is implemented the dplyr APIs."""


@plugin.spec(result=_collect)DOCS
def tibble_api():
    """What is implemented the tibble APIs."""


@plugin.spec(result=_collect)DOCS
def forcats_api():
    """What is implemented the forcats APIs."""


@plugin.spec(result=_collect)DOCS
def tidyr_api():
    """What is implemented the tidyr APIs."""


@plugin.spec(result=_collect)DOCS
def misc_api():
    """What is implemented the misc APIs."""


@plugin.spec(result=SimplugResult.SINGLE)DOCS
def c_getitem(item):
    """Get item for c"""


@plugin.spec(result=SimplugResult.SINGLE)DOCS
def operate(op: str, x: Any, y: Any = None):
    """Operate on x and y"""