"""Base classes for cloud path implementations."""
import sys
from abc import ABC, abstractmethod
from pathlib import PurePosixPath
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
BinaryIO,
Iterator,
List,
Optional,
TextIO,
Tuple,
Union,
)
from panpath.base import PanPath
if TYPE_CHECKING:
from pathlib import Path
from panpath.clients import AsyncClient, AsyncFileHandle, SyncClient
class CloudPath(PanPath, PurePosixPath, ABC):DOCS
"""Base class for cloud path implementations.
Inherits from PanPath and PurePosixPath for path operations.
Includes both sync and async methods (async methods prefixed with a_).
"""
_is_cloud_path = True # Marker for PanPath.__new__
_client: Optional["SyncClient"] = None
_default_client: Optional["SyncClient"] = None
_async_client: Optional["AsyncClient"] = None
_default_async_client: Optional["AsyncClient"] = None
def __new__(cls, *args: Any, **kwargs: Any) -> "CloudPath":DOCS
"""Create new cloud path instance."""
# Extract client before passing to PurePosixPath
client = kwargs.pop("client", None)
async_client = kwargs.pop("async_client", None)
obj = PurePosixPath.__new__(cls, *args)
obj._client = client
obj._async_client = async_client
return obj
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize cloud path (clients already handled in __new__())."""
# Remove client from kwargs if present (already handled in __new__())
kwargs.pop("client", None)
kwargs.pop("async_client", None)
# Python version compatibility for PurePosixPath.__init__():
# - Python 3.9-3.11: Fully initialized in __new__()
# - Python 3.12+: Needs __init__(*args) to set _raw_paths, _drv, etc.
if sys.version_info >= (3, 12):
# Python 3.12+ requires calling __init__ with args to set internal properties
PurePosixPath.__init__(self, *args) # type: ignore
# else: Python 3.9-3.11 don't need __init__ called (already done in __new__)
@propertyDOCS
def client(self) -> "SyncClient":
"""Get or create the sync client for this path."""
if self._client is None: # pragma: no cover
if self.__class__._default_client is None:
self.__class__._default_client = self._create_default_client()
self._client = self.__class__._default_client
return self._client
@propertyDOCS
def async_client(self) -> "AsyncClient":
"""Get or create the async client for this path."""
if self._async_client is None: # pragma: no cover
if self.__class__._default_async_client is None:
self.__class__._default_async_client = self._create_default_async_client()
self._async_client = self.__class__._default_async_client
return self._async_client
@classmethod
@abstractmethod
def _create_default_client(cls) -> "SyncClient":
"""Create the default sync client for this path class."""
@classmethod
@abstractmethod
def _create_default_async_client(cls) -> "AsyncClient":
"""Create the default async client for this path class."""
def _new_cloudpath(self, path: str) -> "CloudPath":
"""Create a new cloud path preserving client and type.
This is called by parent, joinpath, etc. to maintain the path type
and associated client.
"""
return self.__class__(path, client=self._client, async_client=self._async_client)
@propertyDOCS
def parent(self) -> "CloudPath":
"""Return parent directory as same path type."""
parent_path = PurePosixPath.parent.fget(self) # type: ignore
return self._new_cloudpath(str(parent_path))
def __truediv__(self, other: Any) -> "CloudPath":DOCS
"""Join paths while preserving type and client."""
result = PurePosixPath.__truediv__(self, other)
return self._new_cloudpath(str(result))
def __rtruediv__(self, other: Any) -> "CloudPath":DOCS
"""Right join paths while preserving type and client."""
result = PurePosixPath.__rtruediv__(self, other)
return self._new_cloudpath(str(result))
def joinpath(self, *args: Any) -> "CloudPath":DOCS
"""Join paths while preserving type and client."""
result = PurePosixPath.joinpath(self, *args)
return self._new_cloudpath(str(result))
def __str__(self) -> str:DOCS
"""Return properly formatted cloud URI with double slash."""
parts = self.parts
if len(parts) >= 2:
scheme = parts[0].rstrip(":")
bucket = parts[1]
if len(parts) > 2:
key = "/".join(parts[2:])
return f"{scheme}://{bucket}/{key}"
else:
return f"{scheme}://{bucket}"
return PurePosixPath.__str__(self) # pragma: no cover
@propertyDOCS
def cloud_prefix(self) -> str:
"""Return the cloud prefix (e.g., 's3://bucket')."""
parts = self.parts
if len(parts) >= 2:
# parts[0] is 's3:', parts[1] is 'bucket'
scheme = parts[0].rstrip(":")
bucket = parts[1]
return f"{scheme}://{bucket}"
return "" # pragma: no cover
@propertyDOCS
def key(self) -> str:
"""Return the key/blob name without the cloud prefix."""
parts = self.parts
if len(parts) >= 3:
# Join all parts after scheme and bucket
return "/".join(parts[2:])
return ""
# Cloud storage operations delegated to client
def exists(self) -> bool:DOCS
"""Check if path exists."""
return self.client.exists(str(self))
def read_bytes(self) -> bytes:DOCS
"""Read file as bytes."""
return self.client.read_bytes(str(self))
def read_text(self, encoding: str = "utf-8") -> str: # type: ignore[override]DOCS
"""Read file as text."""
return self.client.read_text(str(self), encoding=encoding)
def write_bytes(self, data: bytes) -> None: # type: ignore[override]DOCS
"""Write bytes to file."""
self.client.write_bytes(str(self), data)
def write_text(self, data: str, encoding: str = "utf-8") -> None: # type: ignore[override]DOCS
"""Write text to file."""
self.client.write_text(str(self), data, encoding=encoding)
def unlink(self, missing_ok: bool = False) -> None:DOCS
"""Delete file."""
try:
self.client.delete(str(self))
except FileNotFoundError: # pragma: no cover
if not missing_ok:
raise
def iterdir(self) -> Iterator["CloudPath"]: # type: ignore[override]DOCS
"""Iterate over directory contents."""
for item in self.client.list_dir(str(self)):
yield self._new_cloudpath(item)
def is_dir(self) -> bool:DOCS
"""Check if path is a directory."""
return self.client.is_dir(str(self))
def is_file(self) -> bool:DOCS
"""Check if path is a file."""
return self.client.is_file(str(self))
def stat(self, follow_symlinks: bool = True) -> Any:DOCS
"""Get file stats."""
if follow_symlinks and self.is_symlink():
target = self.readlink()
return target.stat()
return self.client.stat(str(self))
def mkdir(self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False) -> None:DOCS
"""Create a directory marker in cloud storage.
In cloud storage (S3, GCS, Azure), directories are implicit. This method
creates an empty object with a trailing slash to serve as a directory marker.
Args:
mode: Ignored (for compatibility with pathlib)
parents: If True, create parent directories as needed
exist_ok: If True, don't raise error if directory already exists
"""
self.client.mkdir(str(self), parents=parents, exist_ok=exist_ok)
def open( # type: ignore[override]DOCS
self,
mode: str = "r",
encoding: Optional[str] = None,
**kwargs: Any,
) -> Union[BinaryIO, TextIO]:
"""Open file for reading/writing."""
return self.client.open(
str(self),
mode=mode,
encoding=encoding,
**kwargs,
) # type: ignore[return-value]
def __eq__(self, other: Any) -> bool:DOCS
"""Check equality."""
return super().__eq__(other)
def __hash__(self) -> int:DOCS
"""Return hash of path."""
return super().__hash__()
def absolute(self) -> "CloudPath":DOCS
"""Return absolute path - cloud paths are already absolute."""
return self
def is_absolute(self) -> bool:DOCS
"""Cloud paths are always absolute."""
return True
def as_uri(self) -> str:DOCS
"""Return the path as a URI (same as string representation)."""
return str(self)
def match(self, pattern: str) -> bool:DOCS
"""Match path against glob pattern.
Override to work correctly with cloud URIs by matching against
the key portion of the path (excluding scheme and bucket).
"""
from pathlib import PurePosixPath
# For cloud paths, we want to match against the key part only (path after bucket)
# Get the key portion (all parts after scheme and bucket)
our_parts = self.parts[2:] if len(self.parts) > 2 else ()
# If no key parts, can only match empty patterns
if not our_parts: # pragma: no cover
return pattern in ("", "*", "**")
# Create a PurePosixPath from the key parts to do matching
key_path = PurePosixPath(*our_parts)
# Use PurePosixPath's match which handles ** correctly
return key_path.match(pattern)
def glob(self, pattern: str) -> Iterator["CloudPath"]: # type: ignore[override]DOCS
"""Glob for files matching pattern.
Args:
pattern: Pattern to match (e.g., "*.txt", "**/*.py")
Returns:
List of matching paths
"""
for p in self.client.glob(str(self), pattern):
yield self._new_cloudpath(p)
def rglob(self, pattern: str) -> Iterator["CloudPath"]: # type: ignore[override]DOCS
"""Recursively glob for files matching pattern.
Args:
pattern: Pattern to match (e.g., "*.txt", "*.py")
Returns:
List of matching paths (recursive)
"""
yield from self.glob(f"**/{pattern}")
def walk(self) -> Iterator[Tuple["CloudPath", List[str], List[str]]]:DOCS
"""Walk directory tree (like os.walk).
Returns:
List of (dirpath, dirnames, filenames) tuples
"""
for d, subdirs, files in self.client.walk(str(self)):
yield self._new_cloudpath(d), subdirs, files
def touch(self, exist_ok: bool = True) -> None: # type: ignore[override]DOCS
"""Create empty file.
Args:
exist_ok: If False, raise error if file exists
"""
self.client.touch(str(self), exist_ok=exist_ok)
def rename(self, target: Union[str, "CloudPath"]) -> "CloudPath": # type: ignore[override]DOCS
"""Rename/move file to target.
Can move between cloud and local paths.
Args:
target: New path (can be cloud or local)
Returns:
New path instance
"""
target_str = str(target)
# Check if cross-storage operation (cloud <-> local or cloud <-> cloud)
if self._is_cross_storage_op(str(self), target_str): # pragma: no cover
if self.is_dir():
self._copytree_cross_storage(str(self), target_str)
self.rmtree()
else:
self._copy_cross_storage(str(self), target_str)
self.unlink()
else:
# Same storage, use native rename
self.client.rename(str(self), target_str)
return PanPath(target_str) # type: ignore
def replace(self, target: Union[str, "CloudPath"]) -> "CloudPath": # type: ignore[override]DOCS
"""Replace file at target (overwriting if exists).
Args:
target: Target path
Returns:
New path instance
"""
# For cloud storage, replace is same as rename (always overwrites)
return self.rename(target)
def rmdir(self) -> None:DOCS
"""Remove empty directory marker."""
self.client.rmdir(str(self))
def resolve(self) -> "CloudPath": # type: ignore[override]DOCS
"""Resolve to absolute path (no-op for cloud paths).
Returns:
Self (cloud paths are already absolute)
"""
return self.readlink() if self.is_symlink() else self
def samefile(self, other: Union[str, "CloudPath"]) -> bool: # type: ignore[override]DOCS
"""Check if this path refers to same file as other.
Args:
other: Path to compare
Returns:
True if paths are the same
"""
return str(self) == str(other)
def is_symlink(self) -> bool:DOCS
"""Check if this is a symbolic link (via metadata).
Returns:
True if symlink metadata exists
"""
return self.client.is_symlink(str(self))
def readlink(self) -> "CloudPath":DOCS
"""Read symlink target from metadata.
Returns:
Path that this symlink points to
"""
target = self.client.readlink(str(self))
return PanPath( # type: ignore
target,
client=self._client,
async_client=self._async_client,
)
def symlink_to(self, target: Union[str, "CloudPath"]) -> None: # type: ignore[override]DOCS
"""Create symlink pointing to target (via metadata).
Args:
target: Path this symlink should point to (absolute with scheme or relative)
"""
target_str = str(target)
# If target doesn't have a scheme prefix, treat as relative path
if "://" not in target_str: # pragma: no cover
# Resolve relative to symlink's parent directory
target_str = str(self.parent / target_str)
self.client.symlink_to(str(self), target_str)
def rmtree(self, ignore_errors: bool = False, onerror: Optional[Any] = None) -> None:DOCS
"""Remove directory and all its contents recursively.
Args:
ignore_errors: If True, errors are ignored
onerror: Callable that accepts (function, path, excinfo)
"""
self.client.rmtree(str(self), ignore_errors=ignore_errors, onerror=onerror)
def copy(self, target: Union[str, "CloudPath"], follow_symlinks: bool = True) -> "CloudPath":DOCS
"""Copy file to target.
Can copy between cloud and local paths.
Args:
target: Destination path (can be cloud or local)
Returns:
Target path instance
"""
if follow_symlinks and self.is_symlink(): # pragma: no cover
# If following symlinks, read the target and copy that instead
real_path = self.readlink()
return real_path.copy(target, follow_symlinks=False)
target_str = str(target)
# Check if cross-storage operation
if self._is_cross_storage_op(str(self), target_str): # pragma: no cover
self._copy_cross_storage(str(self), target_str)
else:
# Same storage, use native copy
self.client.copy(str(self), target_str)
return PanPath(target_str) # type: ignore
def copytree(DOCS
self, target: Union[str, "CloudPath"], follow_symlinks: bool = True
) -> "CloudPath":
"""Copy directory tree to target recursively.
Can copy between cloud and local paths.
Args:
target: Destination path (can be cloud or local)
follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
Returns:
Target path instance
"""
target_str = str(target)
# Check if cross-storage operation
if self._is_cross_storage_op(str(self), target_str): # pragma: no cover
self._copytree_cross_storage(str(self), target_str, follow_symlinks=follow_symlinks)
else:
# Same storage, use native copytree
self.client.copytree(str(self), target_str, follow_symlinks=follow_symlinks)
return PanPath(target_str) # type: ignore
@staticmethod
def _is_cross_storage_op(src: str, dst: str) -> bool:
"""Check if operation crosses storage boundaries."""
src_scheme = src.split("://")[0] if "://" in src else "file"
dst_scheme = dst.split("://")[0] if "://" in dst else "file"
return src_scheme != dst_scheme
@staticmethod
def _copy_cross_storage(
src: str,
dst: str,
follow_symlinks: bool = True,
chunk_size: int = 1024 * 1024,
) -> None: # pragma: no cover
"""Copy file across storage boundaries.
Args:
src: Source path
dst: Destination path
follow_symlinks: If False, copy symlink as symlink
chunk_size: Size of chunks to read/write (for large files)
"""
src_path = PanPath(src)
dst_path = PanPath(dst)
if follow_symlinks and src_path.is_symlink():
# If following symlinks, read the target and copy that instead
src_path = src_path.readlink()
with src_path.open("rb") as src_file, dst_path.open("wb") as dst_file:
while True:
chunk = src_file.read(chunk_size)
if not chunk:
break
dst_file.write(chunk)
@staticmethod
def _copytree_cross_storage(
src: str,
dst: str,
follow_symlinks: bool = True,
chunk_size: int = 1024 * 1024,
) -> None: # pragma: no cover
"""Copy directory tree across storage boundaries.
Args:
src: Source directory path
dst: Destination directory path
follow_symlinks: If False, copy symlinks as symlinks
chunk_size: Size of chunks to read/write (for large files)
"""
src_path = PanPath(src)
dst_path = PanPath(dst)
# Create destination directory
dst_path.mkdir(parents=True, exist_ok=True)
# Walk source tree and copy all files
for dirpath, dirnames, filenames in src_path.walk():
# Calculate relative path from src
rel_dir = str(dirpath)[len(str(src)) :].lstrip("/")
# Create subdirectories in destination
for dirname in dirnames:
dst_subdir = dst_path / rel_dir / dirname if rel_dir else dst_path / dirname
dst_subdir.mkdir(parents=True, exist_ok=True)
# Copy files
for filename in filenames:
src_file = dirpath / filename
dst_file = dst_path / rel_dir / filename if rel_dir else dst_path / filename
CloudPath._copy_cross_storage(
str(src_file),
str(dst_file),
follow_symlinks=follow_symlinks,
chunk_size=chunk_size,
)
# Async methods (prefixed with a_)
async def a_exists(self) -> bool:DOCS
"""Check if path exists."""
return await self.async_client.exists(str(self))
async def a_read_bytes(self) -> bytes:DOCS
"""Read file as bytes."""
return await self.async_client.read_bytes(str(self))
async def a_read_text(self, encoding: str = "utf-8") -> str:DOCS
"""Read file as text."""
return await self.async_client.read_text(str(self), encoding=encoding)
async def a_write_bytes(DOCS
self,
data: bytes,
) -> None:
"""Write bytes to file."""
await self.async_client.write_bytes(str(self), data)
async def a_write_text(self, data: str, encoding: str = "utf-8") -> int:DOCS
"""Write text to file."""
return await self.async_client.write_text(str(self), data, encoding=encoding)
async def a_unlink(self, missing_ok: bool = False) -> None:DOCS
"""Delete file."""
try:
await self.async_client.delete(str(self))
except FileNotFoundError:
if not missing_ok:
raise
async def a_iterdir( # type: ignore[override]DOCS
self,
) -> AsyncGenerator["CloudPath", None]:
"""List directory contents (async version returns list)."""
for item in await self.async_client.list_dir(str(self)):
yield self._new_cloudpath(item)
async def a_is_dir(self) -> bool:DOCS
"""Check if path is a directory."""
return await self.async_client.is_dir(str(self))
async def a_is_file(self) -> bool:DOCS
"""Check if path is a file."""
return await self.async_client.is_file(str(self))
async def a_stat(self, follow_symlinks: bool = True) -> Any:DOCS
"""Get file stats."""
if follow_symlinks and await self.a_is_symlink():
target = await self.a_readlink()
return await target.a_stat()
return await self.async_client.stat(str(self))
async def a_mkdir(DOCS
self,
mode: int = 0o777,
parents: bool = False,
exist_ok: bool = False,
) -> None:
"""Create a directory marker in cloud storage.
In cloud storage (S3, GCS, Azure), directories are implicit. This method
creates an empty object with a trailing slash to serve as a directory marker.
Args:
mode: Ignored (for compatibility with pathlib)
parents: If True, create parent directories as needed
exist_ok: If True, don't raise error if directory already exists
"""
await self.async_client.mkdir(str(self), parents=parents, exist_ok=exist_ok)
async def a_glob( # type: ignore[override]DOCS
self,
pattern: str,
) -> AsyncGenerator["CloudPath", None]:
"""Glob for files matching pattern.
Args:
pattern: Pattern to match (e.g., "*.txt", "**/*.py")
Returns:
List of matching paths
"""
async for p in self.async_client.glob(str(self), pattern): # type: ignore[attr-defined]
yield self._new_cloudpath(p)
async def a_rglob( # type: ignore[override]DOCS
self,
pattern: str,
) -> AsyncGenerator["CloudPath", None]:
"""Recursively glob for files matching pattern.
Args:
pattern: Pattern to match (e.g., "*.txt", "*.py")
Returns:
List of matching paths (recursive)
"""
async for p in self.a_glob(f"**/{pattern}"):
yield p
async def a_walk( # type: ignore[override]DOCS
self,
) -> AsyncGenerator[Tuple["CloudPath", List[str], List[str]], None]:
"""Walk directory tree (like os.walk).
Returns:
List of (dirpath, dirnames, filenames) tuples
"""
async for d, subdirs, files in self.async_client.walk( # type: ignore[attr-defined]
str(self)
):
yield self._new_cloudpath(d), subdirs, files
async def a_touch(DOCS
self,
mode: int = 0o666,
exist_ok: bool = True,
) -> None:
"""Create empty file.
Args:
exist_ok: If False, raise error if file exists
"""
await self.async_client.touch(str(self), exist_ok=exist_ok)
async def a_rename( # type: ignore[override]DOCS
self,
target: Union[str, "CloudPath"],
) -> "CloudPath":
"""Rename/move file to target.
Can move between cloud and local paths.
Args:
target: New path (can be cloud or local)
Returns:
New path instance
"""
if not await self.a_exists():
raise FileNotFoundError(f"Source path does not exist: {self}")
target_str = str(target)
if not isinstance(target, PanPath): # pragma: no cover
target = PanPath(target_str) # type: ignore[assignment]
source_is_dir = await self.a_is_dir()
target_is_dir = await target.a_is_dir() # type: ignore[union-attr]
target_exists = await target.a_exists() # type: ignore[union-attr]
if source_is_dir and not target_is_dir and target_exists:
raise NotADirectoryError(
f"Cannot rename directory {self} to non-directory target {target}"
)
if not source_is_dir and target_is_dir and target_exists:
raise IsADirectoryError(f"Cannot rename file {self} to directory target {target}")
if source_is_dir:
if not target_exists:
await target.a_mkdir( # type: ignore[union-attr]
parents=True,
exist_ok=True,
)
# Support renaming directories by copying contents
async for item in self.a_iterdir():
relative_path = item.relative_to(self)
new_target = target / relative_path
await item.a_rename(new_target)
await self.a_rmdir()
return target # type: ignore[return-value]
# Check if cross-storage operation
if CloudPath._is_cross_storage_op(str(self), target_str): # pragma: no cover
# Copy then delete for cross-storage
await self._a_copy_cross_storage(str(self), target_str)
await self.a_unlink()
else:
# Same storage, use native rename
await self.async_client.rename(str(self), target_str)
return PanPath(target_str) # type: ignore
async def a_replace( # type: ignore[override]DOCS
self,
target: Union[str, "CloudPath"],
) -> "CloudPath":
"""Replace file at target (overwriting if exists).
Args:
target: Target path
Returns:
New path instance
"""
# For cloud storage, replace is same as rename (always overwrites)
return await self.a_rename(target)
async def a_resolve(self) -> "PanPath":DOCS
"""Resolve to absolute path (no-op for cloud paths).
Returns:
Self (cloud paths are already absolute)
"""
return await self.a_readlink() if await self.a_is_symlink() else self
async def a_rmdir(self) -> None:DOCS
"""Remove empty directory marker."""
await self.async_client.rmdir(str(self))
async def a_is_symlink(self) -> bool:DOCS
"""Check if this is a symbolic link (via metadata).
Returns:
True if symlink metadata exists
"""
return await self.async_client.is_symlink(str(self))
async def a_readlink(self) -> "CloudPath":DOCS
"""Read symlink target from metadata.
Returns:
Path that this symlink points to
"""
target = await self.async_client.readlink(str(self))
return PanPath( # type: ignore
target,
client=self._client,
async_client=self._async_client,
)
async def a_symlink_to( # type: ignore[override]DOCS
self,
target: Union[str, "CloudPath"],
target_is_directory: bool = False,
) -> None:
"""Create symlink pointing to target (via metadata).
Args:
target: Path this symlink should point to (absolute with scheme or relative)
target_is_directory: Ignored (for compatibility with pathlib)
"""
target_str = str(target)
# If target doesn't have a scheme prefix, treat as relative path
if "://" not in target_str: # pragma: no cover
# Resolve relative to symlink's parent directory
target_str = str(self.parent / target_str)
await self.async_client.symlink_to(str(self), target_str)
async def a_rmtree(self, ignore_errors: bool = False, onerror: Optional[Any] = None) -> None:DOCS
"""Remove directory and all its contents recursively.
Args:
ignore_errors: If True, errors are ignored
onerror: Callable that accepts (function, path, excinfo)
"""
await self.async_client.rmtree(str(self), ignore_errors=ignore_errors, onerror=onerror)
async def a_copy(self, target: Union[str, "Path"], follow_symlinks: bool = True) -> "PanPath":DOCS
"""Copy file to target.
Can copy between cloud and local paths.
Args:
target: Destination path (can be cloud or local)
Returns:
Target path instance
"""
target_str = str(target)
# Check if cross-storage operation
if CloudPath._is_cross_storage_op(str(self), target_str): # pragma: no cover
await self._a_copy_cross_storage(str(self), target_str, follow_symlinks=follow_symlinks)
else:
# Same storage, use native copy
await self.async_client.copy(str(self), target_str, follow_symlinks=follow_symlinks)
return PanPath(target_str)
async def a_copytree(DOCS
self,
target: Union[str, "Path"],
follow_symlinks: bool = True,
) -> "CloudPath":
"""Copy directory tree to target recursively.
Can copy between cloud and local paths.
Args:
target: Destination path (can be cloud or local)
follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
Returns:
Target path instance
"""
target_str = str(target)
# Check if cross-storage operation
if CloudPath._is_cross_storage_op(str(self), target_str): # pragma: no cover
await self._a_copytree_cross_storage(
str(self), target_str, follow_symlinks=follow_symlinks
)
else:
# Same storage, use native copytree
await self.async_client.copytree(str(self), target_str, follow_symlinks=follow_symlinks)
return PanPath(target_str) # type: ignore
@staticmethod
async def _a_copy_cross_storage(
src: str,
dst: str,
follow_symlinks: bool = True,
chunk_size: int = 1024 * 1024,
) -> None:
"""Copy file across storage boundaries (async).
Args:
src: Source path
dst: Destination path
follow_symlinks: If False, copy symlinks as symlinks
chunk_size: Size of chunks to read/write (default 1MB)
"""
src_path = PanPath(src)
dst_path = PanPath(dst)
if follow_symlinks and await src_path.a_is_symlink(): # pragma: no cover
# If following symlinks, read the target and copy that instead
src_path = await src_path.a_readlink()
async with src_path.a_open("rb") as fsrc:
async with dst_path.a_open("wb") as fdst:
while True:
buf = await fsrc.read(chunk_size) # Read in 1MB chunks
if not buf:
break
await fdst.write(buf)
@staticmethod
async def _a_copytree_cross_storage(
src: str,
dst: str,
follow_symlinks: bool = True,
chunk_size: int = 1024 * 1024,
) -> None:
"""Copy directory tree across storage boundaries (async).
Args:
src: Source path
dst: Destination path
follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
chunk_size: Size of chunks to read/write (default 1MB)
"""
src_path = PanPath(src)
dst_path = PanPath(dst)
# Create destination directory
await dst_path.a_mkdir(parents=True, exist_ok=True)
# Walk source tree and copy all files
async for dirpath, dirnames, filenames in src_path.a_walk(): # type: ignore[attr-defined]
# Calculate relative path from src
rel_dir = str(dirpath)[len(str(src)) :].lstrip("/")
# Create subdirectories in destination
for dirname in dirnames:
dst_subdir = dst_path / rel_dir / dirname if rel_dir else dst_path / dirname
await dst_subdir.a_mkdir(parents=True, exist_ok=True)
# Copy files
for filename in filenames:
src_file = dirpath / filename
dst_file = dst_path / rel_dir / filename if rel_dir else dst_path / filename
await CloudPath._a_copy_cross_storage(
str(src_file),
str(dst_file),
follow_symlinks=follow_symlinks,
chunk_size=chunk_size,
)
def a_open(DOCS
self,
mode: str = "r",
encoding: Optional[str] = None,
**kwargs: Any,
) -> "AsyncFileHandle":
"""Open file and return async file handle.
Args:
mode: File mode (e.g., 'r', 'w', 'rb', 'wb')
encoding: Text encoding (for text modes)
**kwargs: Additional arguments passed to the async client
Returns:
Async file handle from the async client
"""
return self.async_client.open(
str(self),
mode=mode,
encoding=encoding,
**kwargs,
) # type: ignore[return-value]