Skip to content

SOURCE CODE panpath.s3_async_client DOCS

"""Async S3 client implementation."""

from __future__ import annotations

import asyncio
import os
import re
import weakref
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Set, Union

from panpath.clients import AsyncClient, AsyncFileHandle
from panpath.exceptions import MissingDependencyError, NoStatError

if TYPE_CHECKING:
    import aioboto3  # type: ignore[import-not-found]
    from aiobotocore.client import AioBaseClient  # type: ignore[import-untyped, unused-ignore]
    from botocore.exceptions import ClientError  # type: ignore[import-untyped, unused-ignore]

try:
    import aioboto3
    from aiobotocore.client import AioBaseClient
    from botocore.exceptions import ClientError

    HAS_AIOBOTO3 = True
except ImportError:
    HAS_AIOBOTO3 = False
    ClientError = Exception


# Track all active client instances for cleanup
_active_clients: Set[weakref.ref] = set()  # type: ignore[type-arg]


async def _async_cleanup_all_clients() -> None:
    """Async cleanup of all active client instances."""
    # Create a copy of the set to avoid modification during iteration
    client_to_clean = list(_active_clients)

    for client_ref in client_to_clean:
        client: AioBaseClient = client_ref()
        if client is None:  # pragma: no cover
            continue

        try:
            await client.close()
        except Exception:  # pragma: no cover
            # Ignore errors during cleanup
            pass

    _active_clients.clear()


def _register_loop_cleanup(loop: asyncio.AbstractEventLoop) -> None:
    """Register cleanup to run before loop closes."""
    # Get the original shutdown_asyncgens method
    original_shutdown = loop.shutdown_asyncgens

    async def shutdown_with_cleanup():  # type: ignore[no-untyped-def]
        """Shutdown that includes client cleanup."""
        # Clean up clients first
        await _async_cleanup_all_clients()
        # Then run original shutdown
        await original_shutdown()

    # Replace with our version
    loop.shutdown_asyncgens = shutdown_with_cleanup  # type: ignore[method-assign]


class AsyncS3Client(AsyncClient):DOCS
    """Asynchronous S3 client implementation using aioboto3."""

    prefix = ("s3",)

    def __init__(self, **kwargs: Any):
        """Initialize async S3 client.

        Args:
            **kwargs: Additional arguments passed to aioboto3.Session
        """
        if not HAS_AIOBOTO3:
            raise MissingDependencyError(
                backend="async S3",
                package="aioboto3",
                extra="async-s3",
            )

        self._client: Optional[AioBaseClient] = None
        self._kwargs = kwargs
        self._client_ref: Optional[weakref.ref] = None  # type: ignore[type-arg]

    async def _get_client(self) -> AioBaseClient:
        """Get or create shared client."""
        # For aioboto3, the client is lightweight and doesn't need recreation
        # Track it for cleanup purposes
        needs_recreation = False
        if self._client is None:
            needs_recreation = True
        else:
            try:
                if not self._client._endpoint.http_session._sessions:
                    needs_recreation = True
                    if self._client_ref is not None:
                        _active_clients.discard(self._client_ref)
                        self._client_ref = None
                    self._client = None
            except Exception:  # pragma: no cover
                needs_recreation = True
                self._client = None

        if needs_recreation:
            self._client = await aioboto3.Session(**self._kwargs).client("s3").__aenter__()
            self._client_ref = weakref.ref(self._client, self._on_client_deleted)
            _active_clients.add(self._client_ref)

        # Register cleanup with the current event loop
        try:
            loop = asyncio.get_running_loop()
            # Check if we've already patched this loop
            if not hasattr(loop, "_panpath_s3_cleanup_registered"):
                _register_loop_cleanup(loop)
                loop._panpath_s3_cleanup_registered = True  # type: ignore
        except RuntimeError:  # pragma: no cover
            # No running loop, cleanup will be handled by explicit close
            pass

        return self._client

    def _on_client_deleted(self, ref: "weakref.ref[Any]") -> None:  # pragma: no cover
        """Called when client is garbage collected."""
        _active_clients.discard(ref)

    async def close(self) -> None:DOCS
        """Close the client and cleanup resources."""
        if self._client is not None:
            if self._client_ref is not None:
                _active_clients.discard(self._client_ref)

            await self._client.close()
            self._client = None

    async def exists(self, path: str) -> bool:DOCS
        """Check if S3 object exists."""
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        if not key:
            try:
                await client.head_bucket(Bucket=bucket)
                return True
            except ClientError:
                return False

        try:
            await client.head_object(Bucket=bucket, Key=key)
            return True
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code")
            # Common error codes for "not found"
            if error_code in ("404", "NoSuchKey", "NoSuchBucket", "AccessDenied", "Forbidden"):
                # Check if it's a directory (with trailing slash)
                if key.endswith("/"):
                    return False
                try:
                    await client.head_object(Bucket=bucket, Key=key + "/")
                    return True
                except ClientError:
                    return False
            # For other errors, re-raise
            if error_code not in ("403",):  # pragma: no cover
                raise
            return False

    async def read_bytes(self, path: str) -> bytes:DOCS
        """Read S3 object as bytes."""
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        try:
            response = await client.get_object(Bucket=bucket, Key=key)
            async with response["Body"] as stream:
                return await stream.read()  # type: ignore[no-any-return]
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code")
            if error_code in ("NoSuchKey", "NoSuchBucket", "404"):
                raise FileNotFoundError(f"S3 object not found: {path}")
            raise

    async def write_bytes(  # type: ignore[override]DOCS
        self,
        path: str,
        data: bytes,
    ) -> None:
        """Write bytes to S3 object."""
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        await client.put_object(Bucket=bucket, Key=key, Body=data)

    async def delete(self, path: str) -> None:DOCS
        """Delete S3 object."""
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()

        if await self.is_dir(path):
            raise IsADirectoryError(f"Path is a directory: {path}")

        if not await self.exists(path):
            raise FileNotFoundError(f"S3 object not found: {path}")

        try:
            await client.delete_object(Bucket=bucket, Key=key)
        except ClientError:  # pragma: no cover
            raise

    async def list_dir(self, path: str) -> list[str]:DOCS
        """List S3 objects with prefix."""
        bucket, prefix = self.__class__._parse_path(path)
        if prefix and not prefix.endswith("/"):
            prefix += "/"

        results = []
        client = await self._get_client()
        paginator = client.get_paginator("list_objects_v2")
        async for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter="/"):
            # List "subdirectories"
            for common_prefix in page.get("CommonPrefixes", []):
                results.append(f"{self.prefix[0]}://{bucket}/{common_prefix['Prefix'].rstrip('/')}")
            # List files
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if key != prefix:
                    results.append(f"{self.prefix[0]}://{bucket}/{key}")
        return results

    async def is_dir(self, path: str) -> bool:DOCS
        """Check if S3 path is a directory."""
        bucket, key = self.__class__._parse_path(path)
        if not key:
            return True

        prefix = key if key.endswith("/") else key + "/"
        client = await self._get_client()
        response = await client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
        return "Contents" in response or "CommonPrefixes" in response

    async def is_file(self, path: str) -> bool:DOCS
        """Check if S3 path is a file."""
        bucket, key = self.__class__._parse_path(path)
        if not key:
            return False

        client = await self._get_client()
        try:
            await client.head_object(Bucket=bucket, Key=key)
            return True
        except ClientError:
            return False

    async def stat(self, path: str) -> os.stat_result:DOCS
        """Get S3 object metadata."""
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        try:
            response = await client.head_object(Bucket=bucket, Key=key)
        except ClientError as e:  # pragma: no cover
            if e.response["Error"]["Code"] == "404":
                raise FileNotFoundError(f"S3 object not found: {path}")
            raise
        except Exception:  # pragma: no cover
            raise NoStatError(f"Cannot retrieve stat for: {path}")
        else:
            return os.stat_result(
                (  # type: ignore[arg-type]
                    None,  # mode
                    None,  # ino
                    f"{self.prefix[0]}://",  # dev
                    None,  # nlink
                    None,  # uid
                    None,  # gid
                    response.get("ContentLength", 0),  # size
                    None,  # atime
                    (
                        response.get("LastModified").timestamp()
                        if response.get("LastModified")
                        else None
                    ),  # mtime
                    None,  # ctime
                )
            )

    def open(DOCS
        self,
        path: str,
        mode: str = "r",
        encoding: Optional[str] = None,
        **kwargs: Any,
    ) -> "S3AsyncFileHandle":
        """Open S3 object and return async file handle with streaming support.

        Args:
            path: S3 path (s3://bucket/key)
            mode: File mode ('r', 'w', 'rb', 'wb', 'a', 'ab')
            encoding: Text encoding (for text modes)
            **kwargs: Additional arguments (chunk_size, upload_warning_threshold,
                upload_interval supported)

        Returns:
            S3AsyncFileHandle with streaming support
        """
        # Validate mode
        if mode not in ("r", "w", "rb", "wb", "a", "ab"):
            raise ValueError(f"Unsupported mode: {mode}")

        bucket, key = self.__class__._parse_path(path)
        return S3AsyncFileHandle(
            client_factory=self._get_client,
            bucket=bucket,
            blob=key,
            prefix=self.prefix[0],
            mode=mode,
            encoding=encoding,
            **kwargs,
        )

    async def mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> None:DOCS
        """Create a directory marker (empty object with trailing slash).

        Args:
            path: S3 path (s3://bucket/path)
            parents: If True, create parent directories as needed
            exist_ok: If True, don't raise error if directory already exists
        """
        bucket, key = self.__class__._parse_path(path)

        # Ensure key ends with / for directory marker
        if key and not key.endswith("/"):
            key += "/"

        # Clean up any double slashes in the key
        # while '//' in key:
        #     key = key.replace('//', '/')
        key = re.sub(r"/+", "/", key)

        # Check parent directories if parents=False
        if not parents and key:
            parent_key = "/".join(key.rstrip("/").split("/")[:-1])
            if parent_key:
                parent_key += "/"
                parent_path = f"{self.prefix[0]}://{bucket}/{parent_key}"
                if not await self.exists(parent_path):
                    raise FileNotFoundError(f"Parent directory does not exist: {parent_path}")

        # Check if it already exists
        client = await self._get_client()
        try:
            await client.head_object(Bucket=bucket, Key=key)
            if not exist_ok:
                raise FileExistsError(f"Directory already exists: {path}")
            return
        except ClientError as e:
            error_code = e.response.get("Error", {}).get("Code")
            # Treat 404 and 403 as "doesn't exist" for mkdir
            if error_code not in ("404", "403", "NoSuchKey", "Forbidden"):  # pragma: no cover
                raise

        # Create empty directory marker
        await client.put_object(Bucket=bucket, Key=key, Body=b"")

    async def get_metadata(self, path: str) -> dict[str, Any]:DOCS
        """Get object metadata.

        Args:
            path: S3 path

        Returns:
            Dictionary containing response metadata including 'Metadata' key with user metadata
        """
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        try:
            response = await client.head_object(Bucket=bucket, Key=key)
            return response  # type: ignore[no-any-return]
        except ClientError as e:
            if e.response["Error"]["Code"] == "404":
                raise FileNotFoundError(f"S3 object not found: {path}")
            raise  # pragma: no cover

    async def set_metadata(self, path: str, metadata: dict[str, str]) -> None:DOCS
        """Set object metadata.

        Args:
            path: S3 path
            metadata: Dictionary of metadata key-value pairs
        """
        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        # S3 requires copying object to itself to update metadata
        await client.copy_object(
            Bucket=bucket,
            Key=key,
            CopySource={"Bucket": bucket, "Key": key},
            Metadata=metadata,
            MetadataDirective="REPLACE",
        )

    async def is_symlink(self, path: str) -> bool:DOCS
        """Check if object is a symlink (has symlink-target metadata).

        Args:
            path: S3 path

        Returns:
            True if symlink metadata exists
        """
        try:
            metadata = await self.get_metadata(path)
            return self.__class__.symlink_target_metaname in metadata.get("Metadata", {})
        except FileNotFoundError:
            return False

    async def readlink(self, path: str) -> str:DOCS
        """Read symlink target from metadata.

        Args:
            path: S3 path

        Returns:
            Symlink target path
        """
        metadata = await self.get_metadata(path)
        target = metadata.get("Metadata", {}).get(self.__class__.symlink_target_metaname)
        if not target:
            raise ValueError(f"Not a symlink: {path!r}")

        if any(target.startswith(f"{p}://") for p in self.prefix):
            return target  # type: ignore[no-any-return]

        # relative path - construct full path
        path = path.rstrip("/").rsplit("/", 1)[0]
        return f"{path}/{target}"

    async def symlink_to(self, path: str, target: str) -> None:DOCS
        """Create symlink by storing target in metadata.

        Args:
            path: S3 path for the symlink
            target: Target path the symlink should point to
        """
        bucket, key = self.__class__._parse_path(path)

        client = await self._get_client()
        # Create empty object with symlink metadata
        await client.put_object(
            Bucket=bucket,
            Key=key,
            Body=b"",
            Metadata={self.__class__.symlink_target_metaname: target},
        )

    async def glob(  # type: ignore[override]DOCS
        self,
        path: str,
        pattern: str,
    ) -> AsyncGenerator[str, None]:
        """Glob for files matching pattern.

        Args:
            path: Base S3 path
            pattern: Glob pattern (e.g., "*.txt", "**/*.py")

        Returns:
            List of matching paths (as PanPath objects or strings)
        """
        from fnmatch import fnmatch

        bucket, prefix = self.__class__._parse_path(path)

        client = await self._get_client()
        # Handle recursive patterns
        if "**" in pattern:
            # Recursive search - list all objects under prefix
            paginator = client.get_paginator("list_objects_v2")
            pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

            # Extract the pattern part after **
            pattern_parts = pattern.split("**/")
            if len(pattern_parts) > 1:
                file_pattern = pattern_parts[-1]
            else:
                file_pattern = "*"

            async for page in pages:
                for obj in page.get("Contents", []):
                    key = obj["Key"]
                    if fnmatch(key, f"*{file_pattern}"):
                        path_str = f"{self.prefix[0]}://{bucket}/{key}"
                        yield path_str
        else:
            # Non-recursive - list objects with delimiter
            prefix_with_slash = f"{prefix}/" if prefix and not prefix.endswith("/") else prefix
            response = await client.list_objects_v2(
                Bucket=bucket, Prefix=prefix_with_slash, Delimiter="/"
            )

            for obj in response.get("Contents", []):
                key = obj["Key"]
                if fnmatch(key, f"{prefix_with_slash}{pattern}"):
                    path_str = f"{self.prefix[0]}://{bucket}/{key}"
                    yield path_str

    async def walk(  # type: ignore[override]DOCS
        self,
        path: str,
    ) -> AsyncGenerator[tuple[str, list[str], list[str]], None]:
        """Walk directory tree.

        Args:
            path: Base S3 path

        Yields:
            Tuples of (dirpath, dirnames, filenames)
        """
        bucket, prefix = self.__class__._parse_path(path)

        # List all objects under prefix
        if prefix and not prefix.endswith("/"):
            prefix += "/"

        client = await self._get_client()
        paginator = client.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

        # Organize into directory structure
        dirs: dict[str, tuple[set[str], set[str]]] = {}  # dirpath -> (subdirs, files)

        async for page in pages:
            for obj in page.get("Contents", []):
                key = obj["Key"]
                # Get relative path from prefix
                rel_path = key[len(prefix) :] if prefix else key

                # Split into directory and filename
                parts = rel_path.split("/")
                if len(parts) == 1:
                    # File in root
                    if path not in dirs:
                        dirs[path] = (set(), set())
                    if parts[0]:  # Skip empty strings
                        dirs[path][1].add(parts[0])
                else:
                    # File in subdirectory
                    # First, ensure root directory exists and add the first subdir to it
                    if path not in dirs:  # pragma: no cover
                        dirs[path] = (set(), set())
                    if parts[0]:  # Add first-level subdirectory to root
                        dirs[path][0].add(parts[0])

                    for i in range(len(parts) - 1):
                        dir_path = (
                            f"{path}/" + "/".join(parts[: i + 1])
                            if path
                            else "/".join(parts[: i + 1])
                        )
                        if dir_path not in dirs:
                            dirs[dir_path] = (set(), set())

                        # Add subdirectory if not last part
                        if i < len(parts) - 2:
                            dirs[dir_path][0].add(parts[i + 1])

                    # Add file to its parent directory
                    parent_dir = f"{path}/" + "/".join(parts[:-1]) if path else "/".join(parts[:-1])
                    if parent_dir not in dirs:  # pragma: no cover
                        dirs[parent_dir] = (set(), set())
                    if parts[-1]:  # Skip empty strings
                        dirs[parent_dir][1].add(parts[-1])

        # Yield tuples
        for d, (subdirs, files) in sorted(dirs.items()):
            yield (d, sorted(subdirs), sorted(filter(None, files)))

    async def touch(self, path: str, exist_ok: bool = True, mode: Optional[int] = None) -> None:DOCS
        """Create empty file.

        Args:
            path: S3 path
            exist_ok: If False, raise error if file exists
            mode: Ignored for S3 (for compatibility)
        """
        if mode is not None:
            raise ValueError("Mode parameter is not supported for S3")

        if not exist_ok and await self.exists(path):
            raise FileExistsError(f"File already exists: {path}")

        bucket, key = self.__class__._parse_path(path)
        client = await self._get_client()
        await client.put_object(Bucket=bucket, Key=key, Body=b"")

    async def rename(self, source: str, target: str) -> None:DOCS
        """Rename/move file.

        Args:
            source: Source S3 path
            target: Target S3 path
        """
        # Check if source exists
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        # Copy to new location
        src_bucket, src_key = self.__class__._parse_path(source)
        tgt_bucket, tgt_key = self.__class__._parse_path(target)

        client = await self._get_client()
        # Copy object
        await client.copy_object(
            Bucket=tgt_bucket, Key=tgt_key, CopySource={"Bucket": src_bucket, "Key": src_key}
        )

        # Delete source
        await client.delete_object(Bucket=src_bucket, Key=src_key)

    async def rmdir(self, path: str) -> None:DOCS
        """Remove directory marker.

        Args:
            path: S3 path
        """
        bucket, key = self.__class__._parse_path(path)

        # Ensure key ends with / for directory marker
        if key and not key.endswith("/"):
            key += "/"

        client = await self._get_client()
        # client.delete_object will not raise error if object doesn't exist
        if not await self.exists(path):
            raise FileNotFoundError(f"Directory not found: {path}")

        # Check if it is empty
        if await self.is_dir(path) and await self.list_dir(path):
            raise OSError(f"Directory not empty: {path}")

        await client.delete_object(Bucket=bucket, Key=key)

    async def rmtree(DOCS
        self, path: str, ignore_errors: bool = False, onerror: Optional[Any] = None
    ) -> None:
        """Remove directory and all its contents recursively.

        Args:
            path: S3 path
            ignore_errors: If True, errors are ignored
            onerror: Callable that accepts (function, path, excinfo)
        """
        bucket, prefix = self.__class__._parse_path(path)

        # Ensure prefix ends with / for directory listing
        if prefix and not prefix.endswith("/"):
            prefix += "/"

        try:
            client = await self._get_client()
            # List all objects with this prefix
            objects_to_delete = []
            paginator = client.get_paginator("list_objects_v2")
            async for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
                if "Contents" in page:
                    objects_to_delete.extend([{"Key": obj["Key"]} for obj in page["Contents"]])

            # Delete in batches (max 1000 per request)
            if objects_to_delete:
                for i in range(0, len(objects_to_delete), 1000):
                    batch = objects_to_delete[i : i + 1000]
                    await client.delete_objects(Bucket=bucket, Delete={"Objects": batch})
        except Exception:  # pragma: no cover
            if ignore_errors:
                return
            if onerror is not None:
                import sys

                onerror(client.delete_objects, path, sys.exc_info())
            else:
                raise

    async def copy(self, source: str, target: str, follow_symlinks: bool = True) -> None:DOCS
        """Copy file to target.

        Args:
            source: Source S3 path
            target: Target S3 path
            follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
        """
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        if follow_symlinks and await self.is_symlink(source):
            source = await self.readlink(source)

        # Check if source is a directory
        if await self.is_dir(source):
            raise IsADirectoryError(f"Source is a directory: {source}")

        src_bucket, src_key = self.__class__._parse_path(source)
        tgt_bucket, tgt_key = self.__class__._parse_path(target)

        client = await self._get_client()
        # Use S3's native copy operation
        await client.copy_object(
            Bucket=tgt_bucket, Key=tgt_key, CopySource={"Bucket": src_bucket, "Key": src_key}
        )

    async def copytree(self, source: str, target: str, follow_symlinks: bool = True) -> None:DOCS
        """Copy directory tree to target recursively.

        Args:
            source: Source S3 path
            target: Target S3 path
            follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
        """
        # Check if source exists
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        if follow_symlinks and await self.is_symlink(source):
            source = await self.readlink(source)

        # Check if source is a directory
        if not await self.is_dir(source):
            raise NotADirectoryError(f"Source is not a directory: {source}")

        src_bucket, src_prefix = self.__class__._parse_path(source)
        tgt_bucket, tgt_prefix = self.__class__._parse_path(target)

        # Ensure prefixes end with / for directory operations
        if src_prefix and not src_prefix.endswith("/"):
            src_prefix += "/"
        if tgt_prefix and not tgt_prefix.endswith("/"):
            tgt_prefix += "/"

        client = await self._get_client()
        # List all objects with source prefix
        paginator = client.get_paginator("list_objects_v2")
        async for page in paginator.paginate(Bucket=src_bucket, Prefix=src_prefix):
            if "Contents" not in page:  # pragma: no cover
                continue

            for obj in page["Contents"]:
                src_key = obj["Key"]
                # Calculate relative path and target key
                rel_path = src_key[len(src_prefix) :]
                tgt_key = tgt_prefix + rel_path

                # Copy object
                await client.copy_object(
                    Bucket=tgt_bucket,
                    Key=tgt_key,
                    CopySource={"Bucket": src_bucket, "Key": src_key},
                )


class S3AsyncFileHandle(AsyncFileHandle):DOCS
    """Async file handle for S3 with streaming support.

    Uses aioboto3's streaming API to avoid loading entire files into memory.
    """

    async def _create_stream(self) -> None:
        """Create the underlying stream for reading or writing."""
        client: AioBaseClient = await self._client_factory()
        response = await client.get_object(Bucket=self._bucket, Key=self._blob)
        return response["Body"]  # type: ignore[no-any-return]

    @classmethod
    def _expception_as_filenotfound(cls, exception: Exception) -> bool:
        """Check if exception indicates blob does not exist."""
        return isinstance(exception, ClientError) and exception.response.get("Error", {}).get(
            "Code"
        ) in (
            "NoSuchKey",
            "NoSuchBucket",
            "404",
        )

    async def _upload(self, data: Union[str, bytes]) -> None:
        """Upload data to S3 using append semantics.

        This method appends data using multipart upload.
        For 'w' mode on first write, it overwrites. Subsequently it appends.
        For 'a' mode, it always appends.

        Args:
            data: Data to upload (will be appended to existing content after first write)
        """
        if isinstance(data, str):
            data = data.encode(self._encoding)

        client: AioBaseClient = self._client

        # For 'w' mode on first write, overwrite existing content
        if self._first_write and not self._is_append:
            self._first_write = False
            # Simple overwrite
            await client.put_object(Bucket=self._bucket, Key=self._blob, Body=data)
            return

        self._first_write = False

        # For subsequent writes or append mode, use read-modify-write
        # Check if object exists
        try:
            await client.head_object(Bucket=self._bucket, Key=self._blob)
            object_exists = True
        except ClientError as e:
            if e.response.get("Error", {}).get("Code") in ("NoSuchKey", "404"):
                object_exists = False
            else:  # pragma: no cover
                raise

        if not object_exists:
            # Simple upload for new objects
            await client.put_object(Bucket=self._bucket, Key=self._blob, Body=data)
        else:
            # For existing objects, download, concatenate, and re-upload
            response = await client.get_object(Bucket=self._bucket, Key=self._blob)
            existing_data = await response["Body"].read()
            combined_data = existing_data + data
            await client.put_object(
                Bucket=self._bucket, Key=self._blob, Body=combined_data
            )