"""Utilities for xqute"""
import logging
import re
from os import PathLike
from typing import Callable
import asyncio
from functools import partial, wraps
import aiopath as aiop # type: ignore
import aiofile as aiof
from rich.logging import RichHandler
from .defaults import LOGGER_NAME
# helper functions to read and write the whole content of the file
async def a_read_text(path: PathLike) -> str:DOCS
"""Read the text from a file asyncly
Args:
path: The path of the file
Returns:
The content of the file
"""
async with aiof.async_open(path, mode='rt') as file: # type: ignore
return await file.read()
async def a_write_text(path: PathLike, content: str):DOCS
"""Write the text to a file asyncly
Args:
path: The path to the file
content: The content to be written to the file
"""
async with aiof.async_open(path, mode='wt') as file: # type: ignore
await file.write(content)
def asyncify(func: Callable) -> Callable:DOCS
"""Turn a sync function into a Coroutine, can be used as a decorator
Args:
func: The sync function
Returns:
The Coroutine
"""
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
loop = loop or asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
async def a_mkdir(path: PathLike, *args, **kwargs):DOCS
"""Make a directory asyncly
Args:
path: The path to the directory to be made
*args: args for `Path(path).mkdir(...)`
**kwargs: kwargs for `Path(path).mkdir(...)`
"""
await aiop.AsyncPath(path).mkdir(*args, **kwargs)
def replace_with_leading_space(s: str, old: str, new: str) -> str:DOCS
"""Replace a substring with leading spaces and keep the original spaces
Example:
>>> replace_with_leading_space("a\n b\nc", "b", "x\ny")
'a\n x\n y\nc'
Args:
s: The string
old: The old substring
new: The new substring
Returns:
The new string
"""
return re.sub(
rf"^(\s*){re.escape(old)}",
lambda m: "\n".join(
f"{m.group(1)}{line}" if line else "" for line in new.splitlines()
),
s,
flags=re.MULTILINE,
)
class DuplicateFilter(logging.Filter):DOCS
def __init__(self):
super().__init__()
self.prev_msg = None
def filter(self, record):DOCS
message = record.getMessage()
if message == self.prev_msg:
return False
self.prev_msg = message
return True
logger = logging.getLogger(LOGGER_NAME)
logger.addHandler(RichHandler(show_path=False, omit_repeated_times=False))
logger.addFilter(DuplicateFilter())
logger.setLevel(logging.INFO)