Bulk Operations¶
PanPath provides efficient bulk operations for working with directories and multiple files.
Overview¶
Bulk operations are optimized for performance and work seamlessly across different storage backends:
rmtree()- Remove directory and all contentscopy(src, dst)- Copy a single filecopytree(src, dst)- Copy entire directory treerename(src, dst)- Move/rename files (enhanced for cross-storage)
All operations support:
✅ Cross-storage transfers (S3 ↔ GCS ↔ Azure ↔ Local) ✅ Synchronous and asynchronous modes ✅ Recursive directory operations ✅ Progress tracking (where applicable)
Removing Directory Trees¶
Basic Usage¶
Remove a directory and all its contents:
Safety Options¶
from panpath import PanPath
directory = PanPath("s3://bucket/data/")
# Check before deleting
if directory.exists() and directory.is_dir():
file_count = len(list(directory.rglob("*")))
print(f"About to delete {file_count} files")
if input("Continue? (y/n): ").lower() == "y":
directory.rmtree()
Error Handling¶
from panpath import PanPath
from panpath.exceptions import PathNotFoundError, PermissionError
directory = PanPath("s3://bucket/data/")
try:
directory.rmtree()
except PathNotFoundError:
print("Directory doesn't exist")
except PermissionError:
print("Access denied")
Copying Files¶
Single File Copy¶
Copy a file to a new location:
from panpath import PanPath
# S3 to GCS
s3_file = PanPath("s3://bucket/data.json")
s3_file.copy("gs://other-bucket/data.json")
# Cloud to local
cloud = PanPath("az://container/report.pdf")
cloud.copy("/tmp/report.pdf")
# Local to cloud
local = PanPath("/data/upload.txt")
local.copy("s3://bucket/upload.txt")
from panpath import PanPath
import asyncio
async def copy_files():
# Async copy
src = PanPath("s3://bucket/file.txt")
await src.a_copy("gs://other/file.txt")
# Multiple concurrent copies
files = [
("s3://bucket/a.txt", "gs://backup/a.txt"),
("s3://bucket/b.txt", "gs://backup/b.txt"),
("s3://bucket/c.txt", "gs://backup/c.txt"),
]
await asyncio.gather(*[
PanPath(src).a_copy(dst)
for src, dst in files
])
asyncio.run(copy_files())
Copy Options¶
from panpath import PanPath
src = PanPath("s3://bucket/file.txt")
# Basic copy
src.copy("s3://bucket/backup/file.txt")
# Overwrite if exists
src.copy("s3://bucket/backup/file.txt", overwrite=True)
# Copy to PanPath object
dst = PanPath("gs://other/file.txt")
src.copy(dst)
Copying Directory Trees¶
Basic Usage¶
Copy an entire directory structure recursively:
Async Copytree¶
from panpath import PanPath
import asyncio
async def backup_datasets():
# Async directory copy
src = PanPath("s3://production/data/")
await src.a_copytree("s3://backup/data/")
# Multiple concurrent copytree operations
tasks = [
PanPath("s3://bucket/logs/").a_copytree("/backup/logs/"),
PanPath("s3://bucket/data/").a_copytree("/backup/data/"),
PanPath("s3://bucket/config/").a_copytree("/backup/config/"),
]
await asyncio.gather(*tasks)
asyncio.run(backup_datasets())
Advanced Options¶
from panpath import PanPath
src_dir = PanPath("s3://bucket/data/")
dst_dir = PanPath("gs://other/data/")
# Basic copytree
src_dir.copytree(dst_dir)
# Skip existing files
src_dir.copytree(dst_dir, exist_ok=True)
# Custom filtering (if supported)
src_dir.copytree(
dst_dir,
ignore_patterns=["*.tmp", "*.log"]
)
Moving and Renaming¶
Enhanced Cross-Storage Rename¶
The rename() method now supports cross-storage operations by copying to the destination and deleting the source:
from panpath import PanPath
# S3 to GCS (copies then deletes)
s3_file = PanPath("s3://old-bucket/file.txt")
s3_file.rename("gs://new-bucket/file.txt")
# Cloud to local
cloud = PanPath("az://container/temp.log")
cloud.rename("/var/log/temp.log")
# Between any backends
src = PanPath("gs://bucket/data.csv")
src.rename("s3://other/data.csv")
from panpath import PanPath
import asyncio
async def move_files():
# Async rename/move
old = PanPath("s3://bucket/old.txt")
await old.a_rename("gs://other/new.txt")
# Move multiple files concurrently
files = [
("s3://bucket/a.txt", "gs://backup/a.txt"),
("s3://bucket/b.txt", "gs://backup/b.txt"),
]
await asyncio.gather(*[
PanPath(src).a_rename(dst)
for src, dst in files
])
asyncio.run(move_files())
Return Value¶
rename() returns the new path:
from panpath import PanPath
old_path = PanPath("s3://bucket/old.txt")
new_path = old_path.rename("s3://bucket/new.txt")
print(new_path) # s3://bucket/new.txt
print(new_path.exists()) # True
print(old_path.exists()) # False
Performance Considerations¶
Server-Side Operations¶
When source and destination are on the same storage backend, operations use server-side APIs:
from panpath import PanPath
# Fast: S3 server-side copy
s3_src = PanPath("s3://bucket/large-file.bin")
s3_src.copy("s3://bucket/backup/large-file.bin") # No download/upload!
# Fast: GCS server-side copy
gs_src = PanPath("gs://bucket/data.tar.gz")
gs_src.copy("gs://bucket/archive/data.tar.gz") # No download/upload!
Cross-Storage Transfer¶
Cross-storage operations require download and upload:
from panpath import PanPath
# Slower: Downloads from S3, uploads to GCS
s3_file = PanPath("s3://bucket/large-file.bin")
s3_file.copy("gs://other/large-file.bin") # Downloads then uploads
Parallel Async Operations¶
Use async for concurrent operations:
from panpath import PanPath
import asyncio
async def parallel_copy():
files = [f"s3://bucket/file{i}.txt" for i in range(100)]
# Copy all files concurrently
tasks = [
PanPath(src).a_copy(f"gs://backup/file{i}.txt")
for i, src in enumerate(files)
]
await asyncio.gather(*tasks)
print("Copied 100 files concurrently!")
asyncio.run(parallel_copy())
Chunked Operations¶
For very large directories, process in chunks:
from panpath import PanPath
from itertools import islice
def chunked_copytree(src, dst, chunk_size=100):
"""Copy directory in chunks."""
src_path = PanPath(src)
dst_path = PanPath(dst)
# Get all files
files = list(src_path.rglob("*"))
# Process in chunks
for i in range(0, len(files), chunk_size):
chunk = files[i:i + chunk_size]
for file in chunk:
if file.is_file():
rel_path = file.relative_to(src_path)
file.copy(dst_path / rel_path)
print(f"Processed {min(i + chunk_size, len(files))}/{len(files)} files")
chunked_copytree("s3://huge-bucket/data/", "/local/data/")
Examples¶
Backup Script¶
from panpath import PanPath
from datetime import datetime
def backup_to_cloud(local_dir: str, cloud_bucket: str):
"""Backup local directory to cloud with timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
src = PanPath(local_dir)
dst = PanPath(f"{cloud_bucket}/backup_{timestamp}/")
print(f"Backing up {src} to {dst}...")
src.copytree(dst)
print("Backup complete!")
# Usage
backup_to_cloud("/important/data/", "s3://backups/")
Cleanup Old Files¶
from panpath import PanPath
from datetime import datetime, timedelta
def cleanup_old_files(directory: str, days: int = 30):
"""Remove files older than specified days."""
cutoff = datetime.now().timestamp() - (days * 86400)
dir_path = PanPath(directory)
for file in dir_path.rglob("*"):
if file.is_file():
stat = file.stat()
if stat.st_mtime < cutoff:
print(f"Removing old file: {file}")
file.unlink()
# Usage
cleanup_old_files("s3://bucket/logs/", days=90)
Mirror Directories¶
from panpath import PanPath
def mirror_directories(src: str, dst: str, clean_dst: bool = False):
"""Mirror source directory to destination."""
src_path = PanPath(src)
dst_path = PanPath(dst)
# Optionally clean destination
if clean_dst and dst_path.exists():
print(f"Cleaning {dst_path}...")
dst_path.rmtree()
# Copy directory tree
print(f"Mirroring {src_path} to {dst_path}...")
src_path.copytree(dst_path)
print("Mirror complete!")
# Usage
mirror_directories("s3://production/data/", "s3://backup/data/", clean_dst=True)
Async Batch Operations¶
from panpath import PanPath
import asyncio
async def batch_process(files: list[str], operation: str):
"""Process multiple files concurrently."""
paths = [PanPath(f) for f in files]
if operation == "delete":
await asyncio.gather(*[p.a_unlink() for p in paths])
elif operation == "backup":
await asyncio.gather(*[
p.a_copy(f"s3://backup/{p.name}")
for p in paths
])
print(f"Processed {len(files)} files")
# Usage
files = [f"s3://bucket/temp/file{i}.txt" for i in range(1000)]
asyncio.run(batch_process(files, "delete"))
Best Practices¶
1. Check Before Deleting¶
from panpath import PanPath
def safe_rmtree(path: str):
"""Safely remove directory with confirmation."""
dir_path = PanPath(path)
if not dir_path.exists():
print(f"{path} doesn't exist")
return
# Count files
files = list(dir_path.rglob("*"))
file_count = len([f for f in files if f.is_file()])
# Confirm
print(f"About to delete {file_count} files from {path}")
if input("Continue? (yes/no): ") == "yes":
dir_path.rmtree()
print("Deleted!")
else:
print("Cancelled")
2. Use Async for Large Batches¶
# Slow: Sequential
from panpath import PanPath
for i in range(1000):
PanPath(f"s3://bucket/file{i}.txt").copy(f"gs://other/file{i}.txt")
# Fast: Concurrent
from panpath import PanPath
import asyncio
async def fast_copy():
await asyncio.gather(*[
PanPath(f"s3://bucket/file{i}.txt").a_copy(f"gs://other/file{i}.txt")
for i in range(1000)
])
asyncio.run(fast_copy())
3. Handle Errors Gracefully¶
from panpath import PanPath
from panpath.exceptions import PanPathException
def robust_copytree(src: str, dst: str):
"""Copy directory with error handling."""
src_path = PanPath(src)
dst_path = PanPath(dst)
try:
src_path.copytree(dst_path)
print("Copy successful!")
except PanPathException as e:
print(f"Error during copy: {e}")
# Cleanup partial copy
if dst_path.exists():
print("Cleaning up partial copy...")
dst_path.rmtree()
raise
Next Steps¶
- Cross-Storage Transfers - More on cross-backend operations
- Async Operations - Deep dive into async patterns
- Performance Guide - Optimization tips
- API Reference - Complete API documentation