Skip to content

SOURCE CODE panpath.gs_async_client DOCS

"""Async Google Cloud Storage client implementation."""

from __future__ import annotations

import asyncio
import datetime
import weakref
import os
import sys
from typing import TYPE_CHECKING, Any, AsyncGenerator, Optional, Set, Union

from panpath.clients import AsyncClient, AsyncFileHandle
from panpath.exceptions import MissingDependencyError, NoStatError

if TYPE_CHECKING:
    from gcloud.aio.storage import Storage

try:
    # Monkey-patch SCOPES before importing Storage
    # Must patch the actual storage module, not the package __init__
    import gcloud.aio.storage.storage as _storage_module

    _storage_module.SCOPES = [
        # We need full control to update metadata
        "https://www.googleapis.com/auth/devstorage.full_control",
    ]

    from gcloud.aio.storage import Storage

    HAS_GCLOUD_AIO = True
except ImportError:
    HAS_GCLOUD_AIO = False


# Track all active storage instances for cleanup
_active_clients: Set[weakref.ref] = set()  # type: ignore[type-arg]


async def _async_cleanup_all_clients() -> None:
    """Async cleanup of all active storage instances."""
    # Create a copy of the set to avoid modification during iteration
    clients_to_clean = list(_active_clients)

    for client_ref in clients_to_clean:
        storage = client_ref()
        if storage is None:  # pragma: no cover
            continue

        try:
            await storage.close()
        except Exception:  # pragma: no cover
            # Ignore errors during cleanup
            pass

    _active_clients.clear()


def _register_loop_cleanup(loop: asyncio.AbstractEventLoop) -> None:
    """Register cleanup to run before loop closes."""
    # Get the original shutdown_asyncgens method
    original_shutdown = loop.shutdown_asyncgens

    async def shutdown_with_cleanup():  # type: ignore[no-untyped-def]
        """Shutdown that includes storage cleanup."""
        # Clean up storages first
        await _async_cleanup_all_clients()
        # Then run original shutdown
        await original_shutdown()

    # Replace with our version
    loop.shutdown_asyncgens = shutdown_with_cleanup  # type: ignore[method-assign]


class AsyncGSClient(AsyncClient):DOCS
    """Asynchronous Google Cloud Storage client implementation."""

    prefix = ("gs",)
    symlink_target_metaname = "gcsfuse_symlink_target"

    def __init__(self, **kwargs: Any):
        """Initialize async GCS client.

        Args:
            **kwargs: Additional arguments
        """
        if not HAS_GCLOUD_AIO:
            raise MissingDependencyError(
                backend="async Google Cloud Storage",
                package="gcloud-aio-storage",
                extra="async-gs",
            )
        self._client: Optional[Storage] = None
        self._kwargs = kwargs
        self._client_ref: Optional[weakref.ref] = None  # type: ignore[type-arg]

    async def _get_client(self) -> Storage:
        """Get or create shared storage client for the AsyncGSClient."""
        # Check if storage needs to be recreated (closed or never created)
        needs_recreation = False
        if self._client is None:
            needs_recreation = True
        else:
            # Check if the underlying aiohttp session is closed
            try:
                if self._client.session.session.closed:
                    needs_recreation = True
                    # Clean up the old storage reference
                    if self._client_ref is not None:
                        _active_clients.discard(self._client_ref)
                        self._client_ref = None
                    self._client = None
            except (AttributeError, RuntimeError):  # pragma: no cover
                # If we can't check the session state, recreate to be safe
                needs_recreation = True
                self._client = None

        if needs_recreation:
            self._client = Storage(**self._kwargs)
            # Track this storage instance for cleanup
            self._client_ref = weakref.ref(self._client, self._on_client_deleted)
            _active_clients.add(self._client_ref)

            # Register cleanup with the current event loop
            try:
                loop = asyncio.get_running_loop()
                # Check if we've already patched this loop
                if not hasattr(loop, "_panpath_gs_cleanup_registered"):
                    _register_loop_cleanup(loop)
                    loop._panpath_gs_cleanup_registered = True  # type: ignore
            except RuntimeError:  # pragma: no cover
                # No running loop, cleanup will be handled by explicit close
                pass

        return self._client  # type: ignore[return-value]

    def _on_client_deleted(self, ref: "weakref.ref[Any]") -> None:  # pragma: no cover
        """Called when storage is garbage collected."""
        _active_clients.discard(ref)

    async def close(self) -> None:DOCS
        """Close the storage client and cleanup resources."""
        if self._client is not None:
            # Remove from active storages
            if self._client_ref is not None:
                _active_clients.discard(self._client_ref)
                self._client_ref = None
            # Close the storage
            await self._client.close()
            self._client = None

    async def exists(self, path: str) -> bool:DOCS
        """Check if GCS blob exists."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)
        if not blob_name:
            # check if the bucket exists
            try:
                await storage.get_bucket_metadata(bucket_name + "/")
                return True
            except Exception:  # pragma: no cover
                return False

        try:
            await storage.download_metadata(bucket_name, blob_name)
            return True
        except Exception:
            if blob_name.endswith("/"):
                return False
            try:
                await storage.download_metadata(bucket_name, f"{blob_name}/")
                return True
            except Exception:
                return False

    async def read_bytes(self, path: str) -> bytes:DOCS
        """Read GCS blob as bytes."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)

        try:
            data = await storage.download(bucket_name, blob_name)
            return data
        except Exception as e:
            raise FileNotFoundError(f"GCS blob not found: {path}") from e

    async def write_bytes(  # type: ignore[override]DOCS
        self,
        path: str,
        data: bytes,
    ) -> None:
        """Write bytes to GCS blob."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)
        await storage.upload(bucket_name, blob_name, data)

    async def delete(self, path: str) -> None:DOCS
        """Delete GCS blob."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)

        if await self.is_dir(path):
            raise IsADirectoryError(f"Path is a directory: {path}")

        try:
            await storage.delete(bucket_name, blob_name)
        except Exception as e:
            raise FileNotFoundError(f"GCS blob not found: {path}") from e

    async def list_dir(self, path: str) -> list[str]:DOCS
        """List GCS blobs with prefix."""
        storage = await self._get_client()
        bucket_name, prefix = self.__class__._parse_path(path)

        if prefix and not prefix.endswith("/"):
            prefix += "/"

        results = []
        try:
            blobs = await storage.list_objects(
                bucket_name, params={"prefix": prefix, "delimiter": "/"}
            )

            # Add prefixes (directories)
            for prefix_item in blobs.get("prefixes", []):
                results.append(f"{self.prefix[0]}://{bucket_name}/{prefix_item.rstrip('/')}")

            # Add items (files)
            for item in blobs.get("items", []):
                name = item["name"]
                if name != prefix:
                    results.append(f"{self.prefix[0]}://{bucket_name}/{name}")

        except Exception:  # pragma: no cover
            pass

        return results

    async def is_dir(self, path: str) -> bool:DOCS
        """Check if GCS path is a directory."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)
        if not blob_name and await self.exists(path):
            return True

        blob_name = blob_name if blob_name.endswith("/") else blob_name + "/"
        try:
            # First check if directory marker exists
            await storage.download_metadata(bucket_name, blob_name)
            return True
        except Exception:
            # If no marker, check if any objects exist with this prefix
            try:
                response = await storage.list_objects(
                    bucket_name,
                    params={"prefix": blob_name, "maxResults": 1},  # type: ignore[dict-item]
                )
                return len(response.get("items", [])) > 0
            except Exception:  # pragma: no cover
                return False

    async def is_file(self, path: str) -> bool:DOCS
        """Check if GCS path is a file."""
        return not await self.is_dir(path) and await self.exists(path)

    async def stat(self, path: str) -> os.stat_result:DOCS
        """Get GCS blob metadata."""
        storage = await self._get_client()
        bucket_name, blob_name = self.__class__._parse_path(path)

        metadata = None
        try:
            metadata = await storage.download_metadata(bucket_name, blob_name)
        except Exception as e:
            if blob_name.endswith("/"):  # pragma: no cover
                raise NoStatError(str(path)) from e
            try:
                metadata = await storage.download_metadata(bucket_name, f"{blob_name}/")
            except Exception:
                raise NoStatError(str(path)) from e

        if not metadata:  # pragma: no cover
            raise NoStatError(str(path))

        ctime = (
            None
            if "timeCreated" not in metadata
            else datetime.datetime.fromisoformat(metadata["timeCreated"].replace("Z", "+00:00"))
        )
        mtime = (
            None
            if "updated" not in metadata
            else datetime.datetime.fromisoformat(metadata["updated"].replace("Z", "+00:00"))
        )
        return os.stat_result(
            (  # type: ignore[arg-type]
                None,  # mode
                None,  # ino
                f"{self.prefix[0]}://",  # dev,
                None,  # nlink,
                None,  # uid,
                None,  # gid,
                int(metadata.get("size", 0)),  # size,
                # atime
                mtime,
                # mtime
                mtime,
                # ctime
                ctime,
            )
        )

    def open(DOCS
        self,
        path: str,
        mode: str = "r",
        encoding: Optional[str] = None,
        **kwargs: Any,
    ) -> "GSAsyncFileHandle":
        """Open GCS blob and return async file handle with streaming support.

        Args:
            path: GCS path (gs://bucket/blob)
            mode: File mode ('r', 'w', 'rb', 'wb', 'a', 'ab')
            encoding: Text encoding (for text modes)
            **kwargs: Additional arguments (chunk_size, upload_warning_threshold,
                upload_interval supported)

        Returns:
            GSAsyncFileHandle with streaming support
        """
        if mode not in ("r", "rb", "w", "wb", "a", "ab"):
            raise ValueError(f"Unsupported mode '{mode}'. Use 'r', 'rb', 'w', 'wb', 'a', or 'ab'.")

        bucket_name, blob_name = self.__class__._parse_path(path)
        return GSAsyncFileHandle(
            client_factory=self._get_client,
            bucket=bucket_name,
            blob=blob_name,
            prefix=self.prefix[0],
            mode=mode,
            encoding=encoding,
            **kwargs,
        )

    async def mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> None:DOCS
        """Create a directory marker (empty blob with trailing slash).

        Args:
            path: GCS path (gs://bucket/path)
            parents: If True, create parent directories as needed
            exist_ok: If True, don't raise error if directory already exists
        """
        bucket_name, blob_name = self.__class__._parse_path(path)

        # Ensure path ends with / for directory marker
        if blob_name and not blob_name.endswith("/"):
            blob_name += "/"

        # Check if it already exists
        if await self.exists(f"{self.prefix[0]}://{bucket_name}/{blob_name}"):
            if not exist_ok:
                raise FileExistsError(f"Directory already exists: {path}")
            return

        # check parents
        if blob_name:  # not bucket root
            parent_path = "/".join(blob_name.rstrip("/").split("/")[:-1])
            if parent_path:
                parent_exists = await self.exists(
                    f"{self.prefix[0]}://{bucket_name}/{parent_path}/"
                )
                if not parent_exists:
                    if not parents:
                        raise FileNotFoundError(f"Parent directory does not exist: {path}")
                    # Create parent directories recursively
                    await self.mkdir(
                        f"{self.prefix[0]}://{bucket_name}/{parent_path}/",
                        parents=True,
                        exist_ok=True,
                    )

        # Create empty directory marker
        storage = await self._get_client()
        await storage.upload(bucket_name, blob_name, b"")

    async def get_metadata(self, path: str) -> dict[str, str]:DOCS
        """Get blob metadata.

        Args:
            path: GCS path

        Returns:
            Dictionary of metadata key-value pairs
        """
        bucket_name, blob_name = self.__class__._parse_path(path)
        storage = await self._get_client()

        # Get object metadata
        return await storage.download_metadata(bucket_name, blob_name)

    async def set_metadata(self, path: str, metadata: dict[str, str]) -> None:DOCS
        """Set blob metadata.

        Args:
            path: GCS path
            metadata: Dictionary of metadata key-value pairs
        """
        bucket_name, blob_name = self.__class__._parse_path(path)
        storage = await self._get_client()

        # Update metadata using patch
        metadata = {"metadata": metadata}  # type: ignore[dict-item]
        await storage.patch_metadata(bucket_name, blob_name, metadata=metadata)

    async def readlink(self, path: str) -> str:DOCS
        """Read symlink target from metadata.

        Args:
            path: GCS path

        Returns:
            Symlink target path
        """
        metadata = await self.get_metadata(path)
        target = metadata.get("metadata", {}).get(  # type: ignore[union-attr, call-overload]
            self.__class__.symlink_target_metaname,
            None,
        )
        if not target:
            raise ValueError(f"Not a symlink: {path!r}")

        if any(target.startswith(f"{prefix}://") for prefix in self.__class__.prefix):
            return target  # type: ignore[no-any-return]

        # relative path - construct full path
        path = path.rstrip("/").rsplit("/", 1)[0]
        return f"{path}/{target}"

    async def symlink_to(self, path: str, target: str) -> None:DOCS
        """Create symlink by storing target in metadata.

        Args:
            path: GCS path for the symlink
            target: Target path the symlink should point to
        """
        bucket_name, blob_name = self.__class__._parse_path(path)
        storage = await self._get_client()

        # Create empty blob first
        await storage.upload(bucket_name, blob_name, b"")

        # Then set the symlink metadata
        await self.set_metadata(path, {self.__class__.symlink_target_metaname: target})

    async def glob(  # type: ignore[override]DOCS
        self,
        path: str,
        pattern: str,
    ) -> AsyncGenerator[str, None]:
        """Glob for files matching pattern.

        Args:
            path: Base GCS path
            pattern: Glob pattern (e.g., "*.txt", "**/*.py")

        Returns:
            List of matching PanPath objects or strings
        """
        from fnmatch import fnmatch

        bucket_name, prefix = self.__class__._parse_path(path)
        storage = await self._get_client()

        # Handle recursive patterns
        if "**" in pattern:
            # Recursive search - list all blobs under prefix
            blob_prefix = prefix if prefix else None
            response = await storage.list_objects(
                bucket_name, params={"prefix": blob_prefix} if blob_prefix else {}
            )
            items = response.get("items", [])

            # Extract the pattern part after **
            pattern_parts = pattern.split("**/")
            if len(pattern_parts) > 1:
                file_pattern = pattern_parts[-1]
            else:
                file_pattern = "*"

            for item in items:
                blob_name = item["name"]
                if fnmatch(blob_name, f"*{file_pattern}"):
                    yield f"{self.prefix[0]}://{bucket_name}/{blob_name}"
        else:
            # Non-recursive - list blobs with delimiter
            blob_prefix = f"{prefix}/" if prefix and not prefix.endswith("/") else prefix
            response = await storage.list_objects(
                bucket_name,
                params=(
                    {"prefix": blob_prefix, "delimiter": "/"} if blob_prefix else {"delimiter": "/"}
                ),
            )
            items = response.get("items", [])

            for item in items:
                blob_name = item["name"]
                if fnmatch(blob_name, f"{blob_prefix}{pattern}"):
                    yield f"{self.prefix[0]}://{bucket_name}/{blob_name}"

    async def walk(  # type: ignore[override]DOCS
        self,
        path: str,
    ) -> AsyncGenerator[tuple[str, list[str], list[str]], None]:
        """Walk directory tree.

        Args:
            path: Base GCS path

        Yields:
            Tuples of (dirpath, dirnames, filenames)
        """

        bucket_name, blob_prefix = self.__class__._parse_path(path)
        storage = await self._get_client()

        # List all blobs under prefix
        prefix = blob_prefix if blob_prefix else ""
        if prefix and not prefix.endswith("/"):
            prefix += "/"

        response = await storage.list_objects(
            bucket_name, params={"prefix": prefix} if prefix else {}
        )
        items = response.get("items", [])

        # Organize into directory structure
        dirs: dict[str, tuple[set[str], set[str]]] = {}  # dirpath -> (subdirs, files)

        for item in items:
            blob_name = item["name"]
            # Get relative path from prefix
            rel_path = blob_name[len(prefix) :] if prefix else blob_name

            # Split into directory and filename
            parts = rel_path.split("/")
            if len(parts) == 1:
                # File in root
                if path not in dirs:
                    dirs[path] = (set(), set())
                if parts[0]:  # Skip empty strings
                    dirs[path][1].add(parts[0])
            else:
                # File in subdirectory
                # First, ensure root directory exists and add the first subdir to it
                if path not in dirs:  # pragma: no cover
                    dirs[path] = (set(), set())
                if parts[0]:  # Add first-level subdirectory to root
                    dirs[path][0].add(parts[0])

                # Process all intermediate directories
                for i in range(len(parts) - 1):
                    dir_path = (
                        f"{path}/" + "/".join(parts[: i + 1]) if path else "/".join(parts[: i + 1])
                    )
                    if dir_path not in dirs:
                        dirs[dir_path] = (set(), set())

                    # Add subdirectory if not last part
                    if i < len(parts) - 2:
                        dirs[dir_path][0].add(parts[i + 1])

                # Add file to its parent directory
                parent_dir = f"{path}/" + "/".join(parts[:-1]) if path else "/".join(parts[:-1])
                if parent_dir not in dirs:  # pragma: no cover
                    dirs[parent_dir] = (set(), set())
                if parts[-1]:  # Skip empty strings
                    dirs[parent_dir][1].add(parts[-1])

        # Yield each directory tuple
        for d, (subdirs, files) in sorted(dirs.items()):
            yield (d, sorted(subdirs), sorted(filter(None, files)))

    async def touch(  # type: ignore[no-untyped-def, override]DOCS
        self,
        path: str,
        mode=None,
        exist_ok: bool = True,
    ) -> None:
        """Create empty file.

        Args:
            path: GCS path
            mode: Mode setting (not supported for GCS, will raise ValueError if provided)
            exist_ok: If False, raise error if file exists
        """
        if mode is not None:
            raise ValueError("Mode setting is not supported for Google Cloud Storage.")

        if not exist_ok and await self.exists(path):
            raise FileExistsError(f"File already exists: {path}")

        bucket_name, blob_name = self.__class__._parse_path(path)
        storage = await self._get_client()
        await storage.upload(bucket_name, blob_name, b"")

    async def rename(self, source: str, target: str) -> None:DOCS
        """Rename/move file.

        Args:
            source: Source GCS path
            target: Target GCS path
        """
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        # Copy to new location
        src_bucket_name, src_blob_name = self.__class__._parse_path(source)
        tgt_bucket_name, tgt_blob_name = self.__class__._parse_path(target)

        storage = await self._get_client()

        # Copy blob (read then write)
        data = await storage.download(src_bucket_name, src_blob_name)
        await storage.upload(tgt_bucket_name, tgt_blob_name, data)

        # Delete source
        await storage.delete(src_bucket_name, src_blob_name)

    async def rmdir(self, path: str) -> None:DOCS
        """Remove directory marker.

        Args:
            path: GCS path
        """
        bucket_name, blob_name = self.__class__._parse_path(path)

        # Ensure path ends with / for directory marker
        if blob_name and not blob_name.endswith("/"):
            blob_name += "/"

        # Check if it is empty
        if await self.is_dir(path) and await self.list_dir(path):
            raise OSError(f"Directory not empty: {path}")

        storage = await self._get_client()

        try:
            await storage.delete(bucket_name, blob_name)
        except Exception:
            raise FileNotFoundError(f"Directory not found: {path}")

    async def rmtree(DOCS
        self, path: str, ignore_errors: bool = False, onerror: Optional[Any] = None
    ) -> None:
        """Remove directory and all its contents recursively.

        Args:
            path: GCS path
            ignore_errors: If True, errors are ignored
            onerror: Callable that accepts (function, path, excinfo)
        """
        if not await self.exists(path):
            if ignore_errors:
                return
            else:
                raise FileNotFoundError(f"Path not found: {path}")

        if not await self.is_dir(path):
            if ignore_errors:
                return
            else:
                raise NotADirectoryError(f"Path is not a directory: {path}")

        bucket_name, prefix = self.__class__._parse_path(path)

        # Ensure prefix ends with / for directory listing
        if prefix and not prefix.endswith("/"):
            prefix += "/"

        try:
            storage = await self._get_client()

            # List all blobs with this prefix
            blobs = await storage.list_objects(bucket_name, params={"prefix": prefix})
            blob_names = [item["name"] for item in blobs.get("items", [])]

            # Delete all blobs
            for blob_name in blob_names:
                await storage.delete(bucket_name, blob_name)
        except Exception:  # pragma: no cover
            if ignore_errors:
                return
            if onerror is not None:
                onerror(storage.delete, path, sys.exc_info())
            else:
                raise

    async def copy(self, source: str, target: str, follow_symlinks: bool = True) -> None:DOCS
        """Copy file to target.

        Args:
            source: Source GCS path
            target: Target GCS path
            follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
        """
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        if follow_symlinks and await self.is_symlink(source):
            source = await self.readlink(source)

        if await self.is_dir(source):
            raise IsADirectoryError(f"Source is a directory: {source}")

        src_bucket_name, src_blob_name = self.__class__._parse_path(source)
        tgt_bucket_name, tgt_blob_name = self.__class__._parse_path(target)

        storage = await self._get_client()

        # Read from source
        data = await storage.download(src_bucket_name, src_blob_name)

        # Write to target
        await storage.upload(tgt_bucket_name, tgt_blob_name, data)

    async def copytree(self, source: str, target: str, follow_symlinks: bool = True) -> None:DOCS
        """Copy directory tree to target recursively.

        Args:
            source: Source GCS path
            target: Target GCS path
            follow_symlinks: If False, symlinks are copied as symlinks (not dereferenced)
        """
        if not await self.exists(source):
            raise FileNotFoundError(f"Source not found: {source}")

        if follow_symlinks and await self.is_symlink(source):
            source = await self.readlink(source)

        if not await self.is_dir(source):
            raise NotADirectoryError(f"Source is not a directory: {source}")

        src_bucket_name, src_prefix = self.__class__._parse_path(source)
        tgt_bucket_name, tgt_prefix = self.__class__._parse_path(target)

        # Ensure prefixes end with / for directory operations
        if src_prefix and not src_prefix.endswith("/"):
            src_prefix += "/"
        if tgt_prefix and not tgt_prefix.endswith("/"):
            tgt_prefix += "/"

        storage = await self._get_client()

        # List all blobs with source prefix
        blobs = await storage.list_objects(src_bucket_name, params={"prefix": src_prefix})

        for item in blobs.get("items", []):
            src_blob_name = item["name"]
            # Calculate relative path and target blob name
            rel_path = src_blob_name[len(src_prefix) :]
            tgt_blob_name = tgt_prefix + rel_path

            # Copy blob (read and write)
            data = await storage.download(src_bucket_name, src_blob_name)
            await storage.upload(tgt_bucket_name, tgt_blob_name, data)


class GSAsyncFileHandle(AsyncFileHandle):DOCS
    """Async file handle for GCS with chunked streaming support.

    Uses range requests for reading to avoid loading entire blobs.
    """

    async def _create_stream(self):  # type: ignore[no-untyped-def]
        """Create a GSAsyncReadStream for this file handle."""
        return await self._client.download_stream(  # type: ignore[union-attr]
            self._bucket,
            self._blob,
        )

    @classmethod
    def _expception_as_filenotfound(cls, exception: Exception) -> bool:
        """Check if exception indicates blob does not exist."""
        return True

    async def _upload(self, data: Union[str, bytes]) -> None:
        """Upload data to GCS blob using append semantics.

        This method appends data using GCS compose.
        For 'w' mode on first write, it overwrites. Subsequently it appends.
        For 'a' mode, it always appends.

        Args:
            data: Data to upload
                (will be appended to existing content after first write)
        """
        if isinstance(data, str):
            data = data.encode(self._encoding)

        storage: Storage = self._client  # type: ignore[assignment]

        # For 'w' mode on first write, overwrite existing content
        if self._first_write and not self._is_append:
            self._first_write = False
            # Simple overwrite
            await storage.upload(self._bucket, self._blob, data)
            return

        self._first_write = False

        # For subsequent writes or append mode, use compose to append
        # Check if the original blob exists
        try:
            await storage.download_metadata(self._bucket, self._blob)
            blob_exists = True
        except Exception:
            blob_exists = False

        if not blob_exists:
            # If blob doesn't exist, just upload the new data
            await storage.upload(self._bucket, self._blob, data)
        else:
            # Upload new data to a temporary blob
            # Use upload count to ensure unique temp blob names across multiple flushes
            temp_blob = f"{self._blob}.tmp.{os.getpid()}.{self._upload_count}"
            await storage.upload(
                self._bucket,
                temp_blob,
                data,
            )

            try:
                # Use compose API to concatenate original + new data
                await storage.compose(
                    bucket=self._bucket,
                    object_name=self._blob,
                    source_object_names=[self._blob, temp_blob],
                )
            except Exception as e:  # pragma: no cover
                raise IOError(f"Failed to append data to GCS blob: {self._blob}") from e
            finally:
                # Clean up the temporary blob
                await storage.delete(self._bucket, temp_blob)