Skip to content

SOURCE CODE panpath.clients DOCS

"""Base client classes for sync and async cloud storage operations."""

from abc import ABC, abstractmethod
import asyncio
import time
from typing import (
    Any,
    AsyncGenerator,
    Callable,
    Iterator,
    List,
    Optional,
    Tuple,
    Union,
    Awaitable,
)

import re
import warnings


class Client(ABC):DOCS
    """Base class for cloud storage clients."""

    prefix: Tuple[str, ...]
    symlink_target_metaname: str = "symlink_target"

    @abstractmethodDOCS
    def open(
        self,
        path: str,
        mode: str = "r",
        encoding: Optional[str] = None,
        **kwargs: Any,
    ) -> Union["SyncFileHandle", "AsyncFileHandle"]:
        """Open file and return sync/async file handle.

        Args:
            path: Cloud storage path
            mode: File mode ('r', 'w', 'rb', 'wb', 'a', 'ab')
            encoding: Text encoding (for text modes)
            **kwargs: Additional arguments for specific implementations

        Returns:
            SyncFileHandle/AsyncFileHandle instance
        """

    @classmethod
    def _parse_path(cls, path: str) -> tuple[str, str]:
        """Parse cloud storage path into bucket/container and blob/object key.

        Args:
            path: Full cloud storage path

        Returns:
            Tuple of (bucket/container, blob/object key)
        """
        for prefix in cls.prefix:
            if path.startswith(f"{prefix}://"):
                path = path[len(f"{prefix}://") :]
                break

        path = re.sub(r"/+", "/", path)  # Normalize slashes
        parts = path.split("/", 1)
        bucket = parts[0].lstrip("/")
        blob = parts[1] if len(parts) > 1 else ""
        return bucket, blob


class SyncClient(Client, ABC):DOCS
    """Base class for synchronous cloud storage clients."""

    @abstractmethodDOCS
    def exists(self, path: str) -> bool:
        """Check if path exists."""

    @abstractmethodDOCS
    def read_bytes(self, path: str) -> bytes:
        """Read file as bytes."""

    @abstractmethodDOCS
    def write_bytes(self, path: str, data: bytes) -> None:
        """Write bytes to file."""

    @abstractmethodDOCS
    def delete(self, path: str) -> None:
        """Delete file."""

    @abstractmethodDOCS
    def list_dir(self, path: str) -> Iterator[str]:
        """List directory contents."""

    @abstractmethodDOCS
    def is_dir(self, path: str) -> bool:
        """Check if path is a directory."""

    @abstractmethodDOCS
    def is_file(self, path: str) -> bool:
        """Check if path is a file."""

    @abstractmethodDOCS
    def stat(self, path: str) -> Any:
        """Get file stats."""

    @abstractmethodDOCS
    def mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> None:
        """Create a directory marker (empty blob with trailing slash)."""

    @abstractmethodDOCS
    def glob(self, path: str, pattern: str) -> Iterator[str]:
        """Find all paths matching pattern."""

    @abstractmethodDOCS
    def walk(self, path: str) -> Iterator[tuple[str, list[str], list[str]]]:
        """Walk directory tree."""

    @abstractmethodDOCS
    def touch(self, path: str, exist_ok: bool = True) -> None:
        """Create empty file or update metadata."""

    @abstractmethodDOCS
    def rename(self, src: str, dst: str) -> None:
        """Rename/move file."""

    @abstractmethodDOCS
    def rmdir(self, path: str) -> None:
        """Remove directory marker."""

    @abstractmethodDOCS
    def symlink_to(self, path: str, target: str) -> None:
        """Create symlink by storing target in metadata."""

    @abstractmethodDOCS
    def get_metadata(self, path: str) -> dict[str, str]:
        """Get object metadata."""

    @abstractmethodDOCS
    def set_metadata(self, path: str, metadata: dict[str, str]) -> None:
        """Set object metadata."""

    @abstractmethodDOCS
    def rmtree(self, path: str, ignore_errors: bool = False, onerror: Optional[Any] = None) -> None:
        """Remove directory and all its contents recursively."""

    @abstractmethodDOCS
    def copy(self, src: str, dst: str, follow_symlinks: bool = True) -> None:
        """Copy file from src to dst."""

    @abstractmethodDOCS
    def copytree(self, src: str, dst: str, follow_symlinks: bool = True) -> None:
        """Copy directory tree from src to dst recursively."""

    def read_text(self, path: str, encoding: str = "utf-8") -> str:DOCS
        """Read file as text."""
        data = self.read_bytes(path)
        return data.decode(encoding)

    def write_text(self, path: str, data: str, encoding: str = "utf-8") -> None:DOCS
        """Write text to file."""
        self.write_bytes(path, data.encode(encoding))

    def is_symlink(self, path: str) -> bool:DOCS
        """Check if path is a symlink (has symlink metadata).

        Args:
            path: Cloud path

        Returns:
            True if path is a symlink
        """
        try:
            metadata = self.get_metadata(path)
            meta_dict: Any = metadata.get("metadata", {})
            if isinstance(meta_dict, dict):
                return self.__class__.symlink_target_metaname in meta_dict
            return False  # pragma: no cover
        except Exception:
            return False

    def readlink(self, path: str) -> str:DOCS
        """Read symlink target from metadata.

        Args:
            path: Cloud path

        Returns:
            Symlink target path
        """
        metadata = self.get_metadata(path)
        meta_dict: Any = metadata.get("metadata", {})
        if not isinstance(meta_dict, dict):  # pragma: no cover
            raise ValueError(f"Invalid metadata format for: {path}")
        target: Any = meta_dict.get(self.__class__.symlink_target_metaname, None)
        if not target or not isinstance(target, str):
            raise ValueError(f"Not a symlink: {path!r}")

        if any(target.startswith(f"{prefix}://") for prefix in self.__class__.prefix):
            return str(target)

        path = path.rstrip("/").rsplit("/", 1)[0]  # pragma: no cover
        return f"{path}/{target}"  # pragma: no cover


class AsyncClient(Client, ABC):DOCS
    """Base class for asynchronous cloud storage clients."""

    @abstractmethodDOCS
    async def close(self) -> None:
        """Close any open connections/resources."""

    @abstractmethodDOCS
    async def exists(self, path: str) -> bool:
        """Check if path exists."""

    @abstractmethodDOCS
    async def read_bytes(self, path: str) -> bytes:
        """Read file as bytes."""

    @abstractmethodDOCS
    async def write_bytes(self, path: str, data: bytes) -> int:
        """Write bytes to file."""

    @abstractmethodDOCS
    async def delete(self, path: str) -> None:
        """Delete file."""

    @abstractmethodDOCS
    async def list_dir(self, path: str) -> list[str]:
        """List directory contents."""

    @abstractmethodDOCS
    async def is_dir(self, path: str) -> bool:
        """Check if path is a directory."""

    @abstractmethodDOCS
    async def is_file(self, path: str) -> bool:
        """Check if path is a file."""

    @abstractmethodDOCS
    async def stat(self, path: str) -> Any:
        """Get file stats."""

    @abstractmethodDOCS
    async def mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> None:
        """Create a directory marker (empty blob with trailing slash)."""

    @abstractmethodDOCS
    async def glob(self, path: str, pattern: str) -> AsyncGenerator[str, None]:
        """Find all paths matching pattern."""

    @abstractmethodDOCS
    async def walk(
        self,
        path: str,
    ) -> AsyncGenerator[tuple[str, list[str], list[str]], None]:
        """Walk directory tree."""

    @abstractmethodDOCS
    async def touch(self, path: str, exist_ok: bool = True) -> None:
        """Create empty file or update metadata."""

    @abstractmethodDOCS
    async def rename(self, src: str, dst: str) -> None:
        """Rename/move file."""

    @abstractmethodDOCS
    async def rmdir(self, path: str) -> None:
        """Remove directory marker."""

    @abstractmethodDOCS
    async def symlink_to(self, path: str, target: str) -> None:
        """Create symlink by storing target in metadata."""

    @abstractmethodDOCS
    async def get_metadata(self, path: str) -> dict[str, str]:
        """Get object metadata."""

    @abstractmethodDOCS
    async def set_metadata(self, path: str, metadata: dict[str, str]) -> None:
        """Set object metadata."""

    @abstractmethodDOCS
    async def rmtree(
        self, path: str, ignore_errors: bool = False, onerror: Optional[Any] = None
    ) -> None:
        """Remove directory and all its contents recursively."""

    @abstractmethodDOCS
    async def copy(self, src: str, dst: str, follow_symlinks: bool = True) -> None:
        """Copy file from src to dst."""

    @abstractmethodDOCS
    async def copytree(self, src: str, dst: str, follow_symlinks: bool = True) -> None:
        """Copy directory tree from src to dst recursively."""

    async def __aenter__(self) -> "AsyncClient":DOCS
        """Enter async context manager."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:DOCS
        """Exit async context manager."""
        await self.close()

    async def read_text(self, path: str, encoding: str = "utf-8") -> str:DOCS
        """Read Azure blob as text."""
        data = await self.read_bytes(path)
        return data.decode(encoding)

    async def write_text(self, path: str, data: str, encoding: str = "utf-8") -> int:DOCS
        """Write text to Azure blob."""
        return await self.write_bytes(path, data.encode(encoding))

    async def is_symlink(self, path: str) -> bool:DOCS
        """Check if path is a symlink (has symlink metadata).

        Args:
            path: Cloud path

        Returns:
            True if path is a symlink
        """
        try:
            metadata = await self.get_metadata(path)
            meta_dict: Any = metadata.get("metadata", {})
            if isinstance(meta_dict, dict):
                return self.__class__.symlink_target_metaname in meta_dict
            return False  # pragma: no cover
        except Exception:
            return False

    async def readlink(self, path: str) -> str:DOCS
        """Read symlink target from metadata.

        Args:
            path: Cloud path

        Returns:
            Symlink target path
        """
        metadata = await self.get_metadata(path)
        meta_dict: Any = metadata.get("metadata", {})
        if not isinstance(meta_dict, dict):  # pragma: no cover
            raise ValueError(f"Invalid metadata format for: {path}")
        target: Any = meta_dict.get(self.__class__.symlink_target_metaname, None)
        if not target or not isinstance(target, str):
            raise ValueError(f"Not a symlink: {path}")

        if any(target.startswith(f"{prefix}://") for prefix in self.__class__.prefix):
            return str(target)

        path = path.rstrip("/").rsplit("/", 1)[0]  # # pragma: no cover
        return f"{path}/{target}"  # # pragma: no cover


class AsyncFileHandle(ABC):DOCS
    """Base class for async file handles.

    This abstract base class defines the interface for async file operations
    on cloud storage. Each cloud provider implements its own version using
    the provider's specific streaming capabilities.
    """

    def __init__(
        self,
        client_factory: Callable[[], Awaitable[Any]],
        bucket: str,
        blob: str,
        prefix: str,
        mode: str = "r",
        encoding: Optional[str] = None,
        chunk_size: int = 4096,
        upload_warning_threshold: int = 100,
        upload_interval: float = 1.0,
    ):
        """Initialize async file handle.

        Args:
            client_factor: Async client factory for cloud operations
            bucket: Cloud storage bucket name or container
            blob: Cloud storage blob name or object key
            prefix: Cloud storage path prefix
            mode: File mode ('r', 'w', 'rb', 'wb', etc.)
            encoding: Text encoding (for text modes)
            chunk_size: Size of chunks to read
            upload_warning_threshold: Number of chunk uploads before warning (default: 100)
                -1 to disable warning
            upload_interval: Minimum interval (in seconds) between uploads to avoid
                rate limits (default: 1.0)
        """
        self._client_factory = client_factory
        self._client: Optional[AsyncClient] = None
        self._bucket = bucket
        self._blob = blob
        self._prefix = prefix
        self._mode = mode
        self._encoding = encoding or "utf-8"
        self._chunk_size = chunk_size
        self._closed = False
        self._upload_warning_threshold = upload_warning_threshold
        self._upload_count = 0
        self._first_write = True  # Track if this is the first write (for 'w' mode clearing)
        self._upload_interval = upload_interval
        self._last_upload_time: Optional[float] = None

        # For write modes
        self._write_buffer: Union[bytearray, List[str]] = bytearray() if "b" in mode else []

        # Parse mode
        self._is_read = "r" in mode
        self._is_write = "w" in mode or "a" in mode
        self._is_binary = "b" in mode
        self._is_append = "a" in mode

        self._stream: Any = None
        self._read_buffer: Union[bytes, str] = b"" if self._is_binary else ""
        self._read_pos = 0
        self._eof = False

    @classmethod
    @abstractmethod
    def _expception_as_filenotfound(cls, exception: Exception) -> bool:
        """Check if exception indicates 'file not found'."""

    @abstractmethod
    async def _create_stream(self) -> Any:
        """Create and return the underlying async stream for reading."""

    @abstractmethod
    async def _upload(self, data: Union[bytes, str]) -> None:
        """Upload data to cloud storage (used internally)."""

    async def _stream_read(self, size: int = -1) -> Union[str, bytes]:
        """Read from stream (used internally)."""
        if self._stream is None:  # pragma: no cover
            raise ValueError("Stream not initialized")
        chunk = await self._stream.read(size)
        if self._is_binary:
            return chunk  # type: ignore
        else:
            return chunk.decode(self._encoding)  # type: ignore

    async def flush(self) -> None:DOCS
        """Flush write buffer to cloud storage.

        After open, all flushes append to existing content using provider-native
        append operations. The difference between 'w' and 'a' modes is that 'w'
        clears existing content on open, while 'a' preserves it.
        """
        if self._closed:  # pragma: no cover
            raise ValueError("I/O operation on closed file")

        if not self._is_write:  # pragma: no cover
            return

        if not self._write_buffer and not self._first_write:
            return

        if self._is_binary:
            data: Union[bytes, str] = bytes(self._write_buffer)  # type: ignore
        else:
            data = "".join(self._write_buffer)  # type: ignore

        # Rate limiting: wait if needed to respect upload_interval
        if self._upload_interval > 0 and self._last_upload_time is not None:
            elapsed = time.time() - self._last_upload_time
            if elapsed < self._upload_interval:
                await asyncio.sleep(self._upload_interval - elapsed)

        await self._upload(data)
        self._last_upload_time = time.time()
        self._write_buffer = bytearray() if self._is_binary else []

        # Track upload count and warn if threshold exceeded
        self._upload_count += 1
        if self._upload_count == self._upload_warning_threshold:
            warnings.warn(
                f"File handle has flushed {self._upload_count} times. "
                "Consider using larger chunk_size or buffering writes to reduce "
                "cloud API calls. Set upload_warning_threshold=-1 to suppress "
                "this warning.",
                ResourceWarning,
                stacklevel=2,
            )

    async def reset_stream(self) -> None:DOCS
        """Reset the underlying stream to the beginning."""
        self._stream = await self._create_stream()
        self._read_buffer = b"" if self._is_binary else ""
        self._read_pos = 0
        self._eof = False

    async def __aenter__(self) -> "AsyncFileHandle":DOCS
        """Enter async context manager."""
        self._client = await self._client_factory()

        if self._is_read:
            try:
                self._stream = await self._create_stream()
            except Exception as e:
                if self.__class__._expception_as_filenotfound(e):
                    raise FileNotFoundError(
                        f"File not found: {self._prefix}://{self._bucket}/{self._blob}"
                    ) from None
                else:  # pragma: no cover
                    raise
        elif self._is_write and not self._is_append:
            # 'w' mode: clear existing content - do nothing here, will create on first write
            # The difference is that subsequent flushes will append
            pass
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:DOCS
        """Exit async context manager."""
        await self.close()
        self._client = None

    async def read(self, size: int = -1) -> Union[str, bytes]:DOCS
        """Read and return up to size bytes/characters.

        Args:
            size: Number of bytes/chars to read (-1 for all)

        Returns:
            Data read from file
        """
        if not self._is_read:
            raise ValueError("File not opened for reading")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        # First, consume any buffered data
        if self._read_buffer:
            if size == -1:  # pragma: no cover
                # Return all buffered data plus rest of stream
                buffered = self._read_buffer
                self._read_buffer = b"" if self._is_binary else ""
                rest = await self._stream_read(-1)
                self._read_pos += len(rest)
                self._eof = True
                result: Union[str, bytes] = buffered + rest  # type: ignore
                return result
            else:
                # Return from buffer first
                if len(self._read_buffer) >= size:  # pragma: no cover
                    result_buf: Union[str, bytes] = self._read_buffer[:size]
                    self._read_buffer = self._read_buffer[size:]
                    return result_buf
                else:  # pragma: no cover
                    # Not enough in buffer, need to read more
                    buffered = self._read_buffer
                    self._read_buffer = b"" if self._is_binary else ""
                    remaining = size - len(buffered)
                    result_chunk = await self._stream_read(remaining)
                    if not result_chunk:
                        self._eof = True
                        return buffered
                    self._read_pos += len(result_chunk)
                    combined: Union[str, bytes] = buffered + result_chunk  # type: ignore
                    return combined

        # No buffered data, read from stream
        if size == -1:
            result_stream = await self._stream_read(-1)
            self._read_pos += len(result_stream)
            self._eof = True
            return result_stream
        else:
            result_stream = await self._stream_read(size)
            if not result_stream:  # pragma: no cover
                self._eof = True
                return result_stream

            self._read_pos += len(result_stream)
            return result_stream

    async def readline(self, size: int = -1) -> Union[str, bytes]:DOCS
        """Read and return one line from the file."""
        if not self._is_read:
            raise ValueError("File not opened for reading")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        newline: Union[bytes, str] = b"\n" if self._is_binary else "\n"
        # Fill buffer until we find a newline or reach EOF
        while not self._eof:
            if self._is_binary:  # pragma: no cover
                bytes_buffer: bytes = self._read_buffer  # type: ignore
                bytes_newline: bytes = newline  # type: ignore
                if bytes_newline in bytes_buffer:
                    break
            else:
                str_buffer_check: str = self._read_buffer  # type: ignore
                str_newline: str = newline  # type: ignore
                if str_newline in str_buffer_check:
                    break

            chunk = await self._stream_read(self._chunk_size)
            if not chunk:
                self._eof = True
                break
            self._read_pos += len(chunk)
            buffer_tmp: Union[bytes, str] = self._read_buffer + chunk  # type: ignore
            self._read_buffer = buffer_tmp

        try:
            end = self._read_buffer.index(newline) + 1  # type: ignore
        except ValueError:
            end = len(self._read_buffer)

        if size != -1 and end > size:
            end = size

        result_line: Union[str, bytes] = self._read_buffer[:end]
        self._read_buffer = self._read_buffer[end:]
        return result_line

    async def readlines(self) -> List[Union[str, bytes]]:DOCS
        """Read and return all lines from the file."""
        lines = []
        while True:
            line = await self.readline()
            if not line:
                break
            lines.append(line)
        return lines

    async def write(self, data: Union[str, bytes]) -> int:DOCS
        """Write data to the file."""
        if not self._is_write:
            raise ValueError("File not opened for writing")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        if self._is_binary:
            if isinstance(data, str):
                data = data.encode(self._encoding)
            self._write_buffer.extend(data)  # type: ignore
        else:
            if isinstance(data, bytes):
                data = data.decode(self._encoding)
            self._write_buffer.append(data)  # type: ignore

        if len(self._write_buffer) >= self._chunk_size:
            await self.flush()

        return len(data)

    async def writelines(self, lines: List[Union[str, bytes]]) -> None:DOCS
        """Write a list of lines to the file."""
        for line in lines:
            await self.write(line)

    async def close(self) -> None:DOCS
        """Close the file and flush write buffer to cloud storage."""
        if self._closed:
            return

        if self._is_write and self._client:
            await self.flush()

        self._closed = True

    def __aiter__(self) -> "AsyncFileHandle":DOCS
        """Support async iteration over lines."""
        if not self._is_read:
            raise ValueError("File not opened for reading")
        return self

    async def __anext__(self) -> Union[str, bytes]:DOCS
        """Get next line in async iteration."""
        line = await self.readline()
        if not line:
            raise StopAsyncIteration
        return line

    @propertyDOCS
    def closed(self) -> bool:
        """Check if file is closed."""
        return self._closed

    async def tell(self) -> int:DOCS
        """Return current stream position.

        Returns:
            Current position in the file
        """
        if not self._is_read:
            raise ValueError("tell() not supported in write mode")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        # Calculate buffer size in bytes
        if self._is_binary:
            buffer_byte_size = len(self._read_buffer)
        else:
            # In text mode, encode the buffer to get its byte size
            str_buffer: str = self._read_buffer  # type: ignore
            buffer_byte_size = len(str_buffer.encode(self._encoding))

        return self._read_pos - buffer_byte_size

    async def seek(self, offset: int, whence: int = 0) -> int:DOCS
        """Change stream position (forward seeking only).

        Args:
            offset: Position offset
            whence: Reference point (0=start, 1=current, 2=end)

        Returns:
            New absolute position

        Raises:
            OSError: If backward seeking is attempted
            ValueError: If called in write mode or on closed file

        Note:
            - Only forward seeking is supported due to streaming limitations
            - SEEK_END (whence=2) is not supported as blob size may be unknown
            - Backward seeking requires re-opening the stream
        """
        if not self._is_read:
            raise ValueError("seek() not supported in write mode")
        if self._closed:
            raise ValueError("I/O operation on closed file")
        if whence == 2:
            raise OSError("SEEK_END not supported for streaming reads")

        # Calculate target position
        current_pos = await self.tell()
        if whence == 0:
            target_pos = offset
        elif whence == 1:
            target_pos = current_pos + offset
        else:
            raise ValueError(f"Invalid whence value: {whence}")

        if target_pos == 0:
            await self.reset_stream()
            return 0

        # Check for backward seeking
        if target_pos < current_pos:
            raise OSError("Backward seeking not supported for streaming reads")

        # Forward seek: read and discard data
        bytes_to_skip = target_pos - current_pos
        while bytes_to_skip > 0 and not self._eof:
            chunk_size = min(bytes_to_skip, 8192)
            chunk = await self.read(chunk_size)
            if not chunk:  # pragma: no cover
                break
            if self._is_binary:
                bytes_chunk: bytes = chunk  # type: ignore
                bytes_to_skip -= len(bytes_chunk)
            else:  # pragma: no cover
                str_chunk: str = chunk  # type: ignore
                bytes_to_skip -= len(str_chunk.encode(self._encoding))

        return await self.tell()


class SyncFileHandle(ABC):DOCS
    """Base class for sync file handles.

    This abstract base class defines the interface for sync file operations
    on cloud storage. Each cloud provider implements its own version using
    the provider's specific streaming capabilities.
    """

    def __init__(
        self,
        client: Any,
        bucket: str,
        blob: str,
        prefix: str,
        mode: str = "r",
        encoding: Optional[str] = None,
        chunk_size: int = 4096,
        upload_warning_threshold: int = 100,
        upload_interval: float = 1.0,
    ):
        """Initialize sync file handle.

        Args:
            client: Sync client for cloud operations
            bucket: Cloud storage bucket name or container
            blob: Cloud storage blob name or object key
            prefix: Cloud storage path prefix
            mode: File mode ('r', 'w', 'rb', 'wb', etc.)
            encoding: Text encoding (for text modes)
            chunk_size: Size of chunks to read
            upload_warning_threshold: Number of chunk uploads before warning (default: 100)
            upload_interval: Minimum interval (in seconds) between uploads to avoid
                rate limits (default: 1.0)
        """
        self._client = client
        self._bucket = bucket
        self._blob = blob
        self._prefix = prefix
        self._mode = mode
        self._encoding = encoding or "utf-8"
        self._chunk_size = chunk_size
        self._closed = False
        self._upload_warning_threshold = upload_warning_threshold
        self._upload_count = 0
        self._first_write = True  # Track if this is the first write (for 'w' mode clearing)
        self._upload_interval = upload_interval
        self._last_upload_time: Optional[float] = None

        # For write modes
        self._write_buffer: Union[bytearray, List[str]] = bytearray() if "b" in mode else []

        # Parse mode
        self._is_read = "r" in mode
        self._is_write = "w" in mode or "a" in mode
        self._is_binary = "b" in mode
        self._is_append = "a" in mode

        self._stream: Any = None
        self._read_buffer: Union[bytes, str] = b"" if self._is_binary else ""
        self._read_pos = 0
        self._eof = False

    @classmethod
    @abstractmethod
    def _expception_as_filenotfound(cls, exception: Exception) -> bool:
        """Check if exception indicates 'file not found'."""

    @abstractmethod
    def _create_stream(self) -> Any:
        """Create and return the underlying async stream for reading."""

    @abstractmethod
    def _upload(self, data: Union[bytes, str]) -> None:
        """Upload data to cloud storage (used internally)."""

    def flush(self) -> None:DOCS
        """Flush write buffer to cloud storage.

        After open, all flushes append to existing content using provider-native
        append operations. The difference between 'w' and 'a' modes is that 'w'
        clears existing content on open, while 'a' preserves it.
        """
        if self._closed:  # pragma: no cover
            raise ValueError("I/O operation on closed file")

        if not self._is_write:  # pragma: no cover
            return

        if not self._write_buffer and not self._first_write:
            return

        if self._is_binary:
            data = bytes(self._write_buffer)  # type: ignore
        else:
            data = "".join(self._write_buffer)  # type: ignore

        # Rate limiting: wait if needed to respect upload_interval
        if self._upload_interval > 0 and self._last_upload_time is not None:
            elapsed = time.time() - self._last_upload_time
            if elapsed < self._upload_interval:
                time.sleep(self._upload_interval - elapsed)

        self._upload(data)
        self._last_upload_time = time.time()
        self._write_buffer = bytearray() if self._is_binary else []

        # Track upload count and warn if threshold exceeded
        self._upload_count += 1
        if self._upload_count == self._upload_warning_threshold:
            warnings.warn(
                f"File handle has flushed {self._upload_count} times. Consider using larger "
                f"chunk_size or buffering writes to reduce cloud API calls. "
                f"Set upload_warning_threshold=-1 to suppress this warning.",
                ResourceWarning,
                stacklevel=2,
            )

    def _stream_read(self, size: int = -1) -> Union[str, bytes]:
        """Read from stream (used internally)."""
        # Python 3.9 compatibility: http.client.HTTPResponse.read() doesn't accept -1
        # but google.cloud.storage.fileio.BlobReader doesn't accept None
        # Check the stream type to determine which to use
        if self._stream is None:  # pragma: no cover
            raise ValueError("Stream not initialized")
        if size == -1:
            # Check if this is a boto3/botocore stream (wraps HTTPResponse)
            # These don't accept -1 in Python 3.9
            stream_module = getattr(self._stream.__class__, "__module__", "")
            if "botocore" in stream_module or "urllib3" in stream_module:
                size = None  # type: ignore

        chunk = self._stream.read(size)
        if self._is_binary:
            return chunk  # type: ignore
        else:
            return chunk.decode(self._encoding)  # type: ignore

    def reset_stream(self) -> None:DOCS
        """Reset the underlying stream to the beginning."""
        self._stream = self._create_stream()
        self._read_buffer = b"" if self._is_binary else ""
        self._read_pos = 0
        self._eof = False

    def __enter__(self) -> "SyncFileHandle":DOCS
        """Enter context manager."""
        if self._is_read:
            try:
                self._stream = self._create_stream()
            except Exception as e:
                if self.__class__._expception_as_filenotfound(e):
                    raise FileNotFoundError(
                        f"File not found: {self._prefix}://{self._bucket}/{self._blob}"
                    ) from None
                else:  # pragma: no cover
                    raise
        elif self._is_write and not self._is_append:
            # 'w' mode: clear existing content - do nothing here, will create on
            # first write
            # The difference is that subsequent flushes will append
            pass
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:DOCS
        """Exit async context manager."""
        self.close()
        self._client = None

    def read(self, size: int = -1) -> Union[str, bytes]:DOCS
        """Read and return up to size bytes/characters.

        Args:
            size: Number of bytes/chars to read (-1 for all)

        Returns:
            Data read from file
        """
        if not self._is_read:
            raise ValueError("File not opened for reading")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        # No buffered data, read from stream
        if size == -1:
            result = self._stream_read(-1)
            self._read_pos += len(result)
            self._eof = True
            return result
        else:
            result = self._stream_read(size)
            if not result:  # pragma: no cover
                self._eof = True
                return result

            self._read_pos += len(result)
            return result

    def readline(self, size: int = -1) -> Union[str, bytes]:DOCS
        """Read and return one line from the file."""
        if not self._is_read:
            raise ValueError("File not opened for reading")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        newline: Union[bytes, str] = b"\n" if self._is_binary else "\n"
        # Fill buffer until we find a newline or reach EOF
        while not self._eof:
            if self._is_binary:  # pragma: no cover
                bytes_buffer_sync: bytes = self._read_buffer  # type: ignore
                bytes_newline_sync: bytes = newline  # type: ignore
                if bytes_newline_sync in bytes_buffer_sync:
                    break
            else:
                str_buffer_check_sync: str = self._read_buffer  # type: ignore
                str_newline_sync: str = newline  # type: ignore
                if str_newline_sync in str_buffer_check_sync:  # pragma: no cover
                    break

            chunk = self._stream_read(self._chunk_size)
            if not chunk:  # pragma: no cover
                self._eof = True
                break
            self._read_pos += len(chunk)
            buffer_tmp: Union[bytes, str] = self._read_buffer + chunk  # type: ignore
            self._read_buffer = buffer_tmp

        try:
            end = self._read_buffer.index(newline) + 1  # type: ignore
        except ValueError:
            end = len(self._read_buffer)

        if size != -1 and end > size:
            end = size

        result_line: Union[str, bytes] = self._read_buffer[:end]
        self._read_buffer = self._read_buffer[end:]
        return result_line

    def readlines(self) -> List[Union[str, bytes]]:DOCS
        """Read and return all lines from the file."""
        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines

    def write(self, data: Union[str, bytes]) -> int:DOCS
        """Write data to the file."""
        if not self._is_write:
            raise ValueError("File not opened for writing")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        if self._is_binary:
            if isinstance(data, str):
                data = data.encode(self._encoding)
            self._write_buffer.extend(data)  # type: ignore
        else:
            if isinstance(data, bytes):
                data = data.decode(self._encoding)
            self._write_buffer.append(data)  # type: ignore

        if len(self._write_buffer) >= self._chunk_size:
            self.flush()

        return len(data)

    def writelines(self, lines: List[Union[str, bytes]]) -> None:DOCS
        """Write a list of lines to the file."""
        for line in lines:
            self.write(line)

    def close(self) -> None:DOCS
        """Close the file and flush write buffer to cloud storage."""
        if self._closed:
            return

        if self._is_write and self._client:
            self.flush()

        self._closed = True

    def __iter__(self) -> "SyncFileHandle":DOCS
        """Support async iteration over lines."""
        if not self._is_read:
            raise ValueError("File not opened for reading")
        return self

    def __next__(self) -> Union[str, bytes]:DOCS
        """Get next line in async iteration."""
        line = self.readline()
        if not line:
            raise StopIteration
        return line

    @propertyDOCS
    def closed(self) -> bool:
        """Check if file is closed."""
        return self._closed

    def tell(self) -> int:DOCS
        """Return current stream position.

        Returns:
            Current position in the file
        """
        if not self._is_read:
            raise ValueError("tell() not supported in write mode")
        if self._closed:
            raise ValueError("I/O operation on closed file")

        # Calculate buffer size in bytes
        if self._is_binary:
            buffer_byte_size = len(self._read_buffer)
        else:
            # In text mode, encode the buffer to get its byte size
            str_buffer_sync: str = self._read_buffer  # type: ignore
            buffer_byte_size = len(str_buffer_sync.encode(self._encoding))

        return self._read_pos - buffer_byte_size

    def seek(self, offset: int, whence: int = 0) -> int:DOCS
        """Change stream position (forward seeking only).

        Args:
            offset: Position offset
            whence: Reference point (0=start, 1=current, 2=end)

        Returns:
            New absolute position

        Raises:
            OSError: If backward seeking is attempted
            ValueError: If called in write mode or on closed file

        Note:
            - Only forward seeking is supported due to streaming limitations
            - SEEK_END (whence=2) is not supported as blob size may be unknown
            - Backward seeking requires re-opening the stream
        """
        if not self._is_read:
            raise ValueError("seek() not supported in write mode")
        if self._closed:
            raise ValueError("I/O operation on closed file")
        if whence == 2:
            raise OSError("SEEK_END not supported for streaming reads")

        # Calculate target position
        current_pos = self.tell()
        if whence == 0:
            target_pos = offset
        elif whence == 1:
            target_pos = current_pos + offset
        else:
            raise ValueError(f"Invalid whence value: {whence}")

        if target_pos == 0:
            self.reset_stream()
            return 0

        # Check for backward seeking
        if target_pos < current_pos:
            raise OSError("Backward seeking not supported for streaming reads")

        # Forward seek: read and discard data
        bytes_to_skip = target_pos - current_pos
        while bytes_to_skip > 0 and not self._eof:
            chunk_size = min(bytes_to_skip, 8192)
            chunk = self.read(chunk_size)
            if not chunk:  # pragma: no cover
                break
            if self._is_binary:
                bytes_chunk_sync: bytes = chunk  # type: ignore
                bytes_to_skip -= len(bytes_chunk_sync)
            else:  # pragma: no cover
                str_chunk_sync: str = chunk  # type: ignore
                bytes_to_skip -= len(str_chunk_sync.encode(self._encoding))

        return self.tell()