"""Plugin system to support different backends"""
from typing import Any, List, Mapping, Tuple, Callable
from simplug import Simplug, SimplugResult, makecall
plugin = Simplug("datar")
def _collect(calls: List[Tuple[Callable, Tuple, Mapping]]) -> Mapping[str, Any]:
"""Collect the results from plugins"""
collected = {}
for call in calls:
out = makecall(call)
if out is not None:
collected.update(out)
return collected
@plugin.specDOCS
def setup():
"""Initialize the backend"""
@plugin.spec(result=_collect)DOCS
def get_versions():
"""Return the versions of the dependencies of the plugin."""
@plugin.spec(result=SimplugResult.TRY_SINGLE)DOCS
def load_dataset(name: str, metadata: Mapping):
"""Implementations for load_dataset()"""
@plugin.spec(result=_collect)DOCS
def base_api():
"""What is implemented the base APIs."""
@plugin.spec(result=_collect)DOCS
def dplyr_api():
"""What is implemented the dplyr APIs."""
@plugin.spec(result=_collect)DOCS
def tibble_api():
"""What is implemented the tibble APIs."""
@plugin.spec(result=_collect)DOCS
def forcats_api():
"""What is implemented the forcats APIs."""
@plugin.spec(result=_collect)DOCS
def tidyr_api():
"""What is implemented the tidyr APIs."""
@plugin.spec(result=_collect)DOCS
def misc_api():
"""What is implemented the misc APIs."""
@plugin.spec(result=SimplugResult.SINGLE)DOCS
def c_getitem(item):
"""Get item for c"""
@plugin.spec(result=SimplugResult.SINGLE)DOCS
def operate(op: str, x: Any, y: Any = None):
"""Operate on x and y"""