Skip to content

SOURCE CODE xqute.path DOCS

"""Provides the SpecPath and MountedPath classes.

It is used to represent paths of jobs and it is useful when a job is running in a
remote system (a VM, a container, etc.), where we need to mount the paths into
the remote system (MountedPath).

But in the system where this framework is running, we need to use the paths
(specified directly) that are used in the framework, where we also need to carry
the information of the mounted path (SpecPath).

The module provides two main abstract base classes:
- `MountedPath`: Represents a path as it appears in the remote execution environment.
- `SpecPath`: Represents a path as it appears in the local environment where the
framework runs.

Both classes have implementations for local paths and various cloud storage paths,
including:
- Google Cloud Storage
- Azure Blob Storage
- Amazon S3

These classes maintain the relationship between the local and remote path
representations, allowing transparent path operations while preserving both path
contexts.
"""

from __future__ import annotations

from typing import Any

import os
from pathlib import Path
from panpath import PanPath, LocalPath, CloudPath, GSPath, AzurePath, S3Path

from .defaults import DEFAULT_CLOUD_FSPATH

__all__ = ["SpecPath", "MountedPath"]


class MountedPath(PanPath):DOCS
    """A router class to instantiate the correct path based on the path type
    for the mounted path.

    This abstract base class serves as a factory that creates appropriate mounted path
    instances based on the input path type. It represents a path as it exists in a
    remote execution environment (e.g., container, VM) while maintaining a reference to
    the corresponding path in the local environment.

    Attributes:
        _spec: The corresponding path in the local environment (SpecPath).

    Examples:
        >>> # Create a mounted path with corresponding spec path
        >>> mounted_path = MountedPath(
        >>>   "/container/data/file.txt", spec="/local/data/file.txt"
        >>> )
        >>> str(mounted_path)
        '/container/data/file.txt'
        >>> str(mounted_path.spec)
        '/local/data/file.txt'

        >>> # Create a GCS mounted path
        >>> gs_path = MountedPath("gs://bucket/file.txt", spec="/local/file.txt")
        >>> type(gs_path)
        <class 'xqute.path.MountedGSPath'>

        >>> # Serialize and deserialize a mounted path
        >>> import pickle
        >>> mounted_path = MountedPath("/container/data/file.txt",
        ...                            spec="/local/data/file.txt")
        >>> serialized = pickle.dumps(mounted_path)
        >>> restored = pickle.loads(serialized)
        >>> str(restored) == str(mounted_path)
        True
        >>> str(restored.spec) == str(mounted_path.spec)
        True
    """

    def __new__(  # type: ignoreDOCS
        cls,
        path: str | Path,
        spec: str | Path | None = None,
        *args: Any,
        **kwargs: Any,
    ) -> MountedPath:
        """Factory method to create the appropriate MountedPath subclass instance.

        Args:
            path: The path string or object representing the mounted path location.
            spec: The path string or object representing the corresponding spec path.
                If None, the mounted path itself will be used as the spec path.
            *args: Additional positional arguments passed to the path constructor.
            **kwargs: Additional keyword arguments passed to the path constructor.

        Returns:
            An instance of the appropriate MountedPath subclass based on the path type:
            - MountedGSPath for Google Cloud Storage paths
            - MountedAzurePath for Azure Blob Storage paths
            - MountedS3Path for Amazon S3 paths
            - MountedLocalPath for local filesystem paths
        """

        if cls is MountedPath:
            path = PanPath(path)  # type: ignore
            if isinstance(path, GSPath):
                mounted_class = MountedGSPath
            elif isinstance(path, AzurePath):
                mounted_class = MountedAzurePath  # type: ignore
            elif isinstance(path, S3Path):
                mounted_class = MountedS3Path  # type: ignore
            else:
                mounted_class = MountedLocalPath  # type: ignore

            obj = mounted_class(path, *args, **kwargs)
            obj._spec = PanPath(spec) if spec is not None else obj
            return obj

        # Ensure the underlying Path initialization receives the path so
        # internal parts like `_parts` are populated on older Python versions.
        return super().__new__(cls, path, *args, **kwargs)  # type: ignore

    async def get_fspath(self) -> str:DOCS
        """Get the corresponding local filesystem path and copy from cloud.

        Returns:
            PanPath: The path as it appears in the local filesystem.
        """
        return self.__fspath__()

    @propertyDOCS
    def spec(self) -> SpecPath:
        """Get the corresponding spec path in the local environment.

        Returns:
            SpecPath: The path as it appears in the local environment.
        """
        return SpecPath(self._spec, mounted=self)  # type: ignore

    def is_mounted(self) -> bool:DOCS
        """Check if this path is actually mounted (different from spec path).

        Returns:
            bool: True if the mounted path is different from the spec path, False
            otherwise.
        """
        # Direct string comparison instead of using equality operator
        return str(self._spec) != str(self)

    def __repr__(self):DOCS
        """Generate a string representation of the MountedPath.

        Returns:
            str: A string showing the class name, path, and spec path (if different).
        """
        # Check if spec is different by string comparison rather than using is_mounted()
        if self.is_mounted():
            return f"{type(self).__name__}('{self}', spec='{self._spec}')"
        else:
            return f"{type(self).__name__}('{self}')"

    def __eq__(self, other: Any) -> bool:DOCS
        """Check equality with another path object.

        Two MountedPath objects are equal if they have the same path string
        and the same spec path string.

        Args:
            other: Another object to compare with.

        Returns:
            bool: True if the paths are equal, False otherwise.
        """
        if not isinstance(other, Path):
            return False

        if isinstance(other, MountedPath):
            return str(self) == str(other) and str(self.spec) == str(other.spec)

        return str(self) == str(other)

    def __hash__(self) -> int:DOCS
        """Generate a hash for the MountedPath.

        Returns:
            int: A hash value based on the path string and spec path string.
        """
        return hash((str(self), str(self.spec)))

    def __reduce__(self):DOCS
        """Support for pickling and serialization.

        Returns a tuple of (callable, args, state) so that the
        underlying path is reconstructed from its string, and the
        spec relationship is restored via state.
        """
        return (type(self), (str(self),), {"_spec": str(self._spec)})

    def __setstate__(self, state: dict[str, Any]) -> None:DOCS
        """Restore internal state after unpickling."""
        spec_str = state.get("_spec")
        self._spec = PanPath(spec_str) if spec_str is not None else self

    def with_name(self, name):DOCS
        """Return a new path with the name changed.

        Args:
            name: The new name for the path.

        Returns:
            MountedPath: A new mounted path with the name changed in both
                the mounted path and spec path.
        """
        new_path = LocalPath.with_name(self, name)
        new_spec = PanPath(str(self._spec)).with_name(name)

        return MountedPath(new_path, spec=new_spec)

    def with_suffix(self, suffix):DOCS
        """Return a new path with the suffix changed.

        Args:
            suffix: The new suffix for the path.

        Returns:
            MountedPath: A new mounted path with the suffix changed in both
                the mounted path and spec path.
        """
        new_path = LocalPath.with_suffix(self, suffix)
        new_spec = PanPath(str(self._spec)).with_suffix(suffix)

        return MountedPath(new_path, spec=new_spec)

    def joinpath(self, *pathsegments) -> MountedPath:DOCS
        """Join path components to this path.

        Args:
            *pathsegments: The path segments to append to this path.

        Returns:
            MountedPath: A new mounted path with the segments appended to both
                the mounted path and spec path.
        """
        new_path = LocalPath.joinpath(self, *pathsegments)
        new_spec = PanPath(str(self._spec)).joinpath(*pathsegments)

        return MountedPath(new_path, spec=new_spec)

    def __truediv__(self, key):DOCS
        """Implement the / operator for paths.

        Args:
            key: The path segment to append to this path.

        Returns:
            MountedPath: A new mounted path with the segment appended.
        """
        # it was not implemented with .with_segments()
        return self.joinpath(key)

    @propertyDOCS
    def parent(self):
        """Get the parent directory of this path.

        Returns:
            MountedPath: A new mounted path representing the parent directory
                of both the mounted path and spec path.
        """
        new_path = LocalPath.parent.fget(self)
        new_spec = PanPath(str(self._spec)).parent

        return MountedPath(new_path, spec=new_spec)


class MountedLocalPath(MountedPath, LocalPath):  # type: ignoreDOCS
    """A class to represent a mounted local path

    This class represents a path in a local filesystem as it appears in a remote
    execution environment, while maintaining a reference to its corresponding
    path in the framework's environment.

    Attributes:
        _spec: The corresponding path in the local environment.

    Examples:
        >>> mounted_path = MountedLocalPath("/container/data/file.txt",
        ...                               spec="/local/data/file.txt")
        >>> str(mounted_path)
        '/container/data/file.txt'
        >>> str(mounted_path.spec)
        '/local/data/file.txt'
        >>> mounted_path.name
        'file.txt'
    """


class MountedCloudPath(MountedPath, CloudPath):DOCS
    """A class to represent a mounted cloud path

    This class represents a cloud storage path as it appears in a remote
    execution environment, while maintaining a reference to its corresponding
    path in the framework's environment.

    Attributes:
        _spec: The corresponding path in the local environment.

    Examples:
        >>> mounted_path = MountedPath("gs://bucket/file.txt",
        ...    spec="gs://local-bucket/file.txt")
        >>> str(mounted_path)
        'gs://bucket/file.txt'
        >>> str(mounted_path.spec)
        'gs://local-bucket/file.txt'
    """

    def __fspath__(self) -> str:DOCS
        """Return the filesystem path representation.

        Returns:
            str: The filesystem path as a string.
        """
        cloud_fspath = os.getenv("XQUTE_CLOUD_FSPATH", DEFAULT_CLOUD_FSPATH)
        parts = [
            cloud_fspath,
            self.parts[0].replace(":", ""),
            *self.parts[1:],
        ]
        return os.path.join(*parts)

    async def get_fspath(self) -> str:DOCS
        """Get the corresponding local filesystem path and copy from cloud.

        Returns:
            PanPath: The path as it appears in the local filesystem.
        """
        p = PanPath(self.__fspath__())
        await p.parent.a_mkdir(parents=True, exist_ok=True)

        if await self.a_is_dir():
            await self.a_copytree(p)
        else:
            await self.a_copy(p)

        return str(p)


class MountedGSPath(MountedCloudPath, GSPath):DOCS
    """A class to represent a mounted Google Cloud Storage path

    This class represents a Google Cloud Storage path as it appears in a remote
    execution environment, while maintaining a reference to its corresponding
    path in the framework's environment.

    Examples:
        >>> mounted_path = MountedPath("gs://bucket/file.txt",
        ...                          spec="gs://local-bucket/file.txt")
        >>> isinstance(mounted_path, MountedGSPath)
        True
    """


class MountedAzurePath(MountedCloudPath, AzurePath):DOCS
    """A class to represent a mounted Azure Blob Storage path

    This class represents an Azure Blob Storage path as it appears in a remote
    execution environment, while maintaining a reference to its corresponding
    path in the framework's environment.

    Examples:
        >>> mounted_path = MountedPath("az://container/blob",
        ...                          spec="az://local-container/blob")
        >>> isinstance(mounted_path, MountedAzurePath)
        True
    """


class MountedS3Path(MountedCloudPath, S3Path):DOCS
    """A class to represent a mounted Amazon S3 path

    This class represents an Amazon S3 path as it appears in a remote
    execution environment, while maintaining a reference to its corresponding
    path in the framework's environment.

    Examples:
        >>> mounted_path = MountedPath("s3://bucket/key",
        ...                          spec="s3://local-bucket/key")
        >>> isinstance(mounted_path, MountedS3Path)
        True
    """


class SpecPath(PanPath):DOCS
    """A router class to instantiate the correct path based on the path type
    for the spec path.

    This abstract base class serves as a factory that creates appropriate spec path
    instances based on the input path type. It represents a path in the local
    environment where the framework runs, while maintaining a reference to the
    corresponding path in the remote execution environment.

    Attributes:
        _mounted: The corresponding path in the remote execution environment.

    Examples:
        >>> # Create a spec path with corresponding mounted path
        >>> spec_path = SpecPath(
        >>>   "/local/data/file.txt", mounted="/container/data/file.txt"
        >>> )
        >>> str(spec_path)
        '/local/data/file.txt'
        >>> str(spec_path.mounted)
        '/container/data/file.txt'

        >>> # Create a GCS spec path
        >>> gs_path = SpecPath(
        >>>   "gs://bucket/file.txt", mounted="gs://container-bucket/file.txt"
        >>> )
        >>> type(gs_path)
        <class 'xqute.path.SpecGSPath'>
    """

    def __new__(  # type: ignoreDOCS
        cls,
        path: str | Path,
        *args: Any,
        mounted: str | Path | None = None,
        **kwargs: Any,
    ) -> SpecLocalPath | SpecCloudPath:
        """Factory method to create the appropriate SpecPath subclass instance.

        Args:
            path: The path string or object representing the spec path.
            mounted: The path string or object representing the corresponding mounted
                path. If None, the spec path itself will be used as the mounted path.
            *args: Additional positional arguments passed to the path constructor.
            **kwargs: Additional keyword arguments passed to the path constructor.

        Returns:
            An instance of the appropriate SpecPath subclass based on the path type:
            - SpecGSPath for Google Cloud Storage paths
            - SpecAzurePath for Azure Blob Storage paths
            - SpecS3Path for Amazon S3 paths
            - SpecLocalPath for local filesystem paths
        """
        if cls is SpecPath:
            path = PanPath(path)  # type: ignore
            if isinstance(path, GSPath):
                spec_class = SpecGSPath
            elif isinstance(path, AzurePath):
                spec_class = SpecAzurePath  # type: ignore
            elif isinstance(path, S3Path):
                spec_class = SpecS3Path  # type: ignore
            else:
                spec_class = SpecLocalPath

            obj = spec_class(path, *args, **kwargs)  # type: ignore
            obj._mounted = PanPath(mounted) if mounted is not None else obj
            return obj

        # Ensure Path internals are initialized with the provided path
        return super().__new__(cls, path, *args, **kwargs)  # type: ignore

    async def get_fspath(self) -> str:DOCS
        """Get the corresponding local filesystem path and copy from cloud.

        Returns:
            PanPath: The path as it appears in the local filesystem.
        """
        return self.__fspath__()

    @propertyDOCS
    def mounted(self) -> MountedPath:
        """Get the corresponding mounted path in the remote environment.

        Returns:
            MountedPath: The path as it appears in the remote execution environment.
        """
        # Make sure we handle the case where _mounted might not be set
        return MountedPath(self._mounted, spec=self)  # type: ignore

    def __repr__(self) -> str:DOCS
        """Generate a string representation of the SpecPath.

        Returns:
            str: A string showing the class name, path, and mounted path (if different).
        """
        if self.mounted.is_mounted():
            return f"{type(self).__name__}('{self}', mounted='{self._mounted}')"
        else:
            return f"{type(self).__name__}('{self}')"

    def __eq__(self, other: Any) -> bool:DOCS
        """Check equality with another path object.

        Two SpecPath objects are equal if they have the same path string
        and the same mounted path string.

        Args:
            other: Another object to compare with.

        Returns:
            bool: True if the paths are equal, False otherwise.
        """
        if not isinstance(other, Path):
            return False

        if isinstance(other, SpecPath):
            return str(self) == str(other) and str(self.mounted) == str(other.mounted)

        return str(self) == str(other)

    def __hash__(self) -> int:DOCS
        """Generate a hash for the SpecPath.

        Returns:
            int: A hash value based on the path string and mounted path string.
        """
        return hash((str(self), str(self.mounted)))

    def with_name(self, name) -> SpecPath:DOCS
        """Return a new path with the name changed.

        Args:
            name: The new name for the path.

        Returns:
            SpecPath: A new spec path with the name changed in both
                the spec path and mounted path.
        """
        new_path = LocalPath.with_name(self, name)
        new_mounted = PanPath(str(self._mounted)).with_name(name)

        return SpecPath(new_path, mounted=new_mounted)

    def with_suffix(self, suffix) -> SpecPath:DOCS
        """Return a new path with the suffix changed.

        Args:
            suffix: The new suffix for the path.

        Returns:
            SpecPath: A new spec path with the suffix changed in both
                the spec path and mounted path.
        """
        new_path = LocalPath.with_suffix(self, suffix)
        new_mounted = PanPath(str(self._mounted)).with_suffix(suffix)

        return SpecPath(new_path, mounted=new_mounted)

    def with_stem(self, stem) -> SpecPath:DOCS
        """Return a new path with the stem changed.

        The stem is the filename without the suffix.

        Args:
            stem: The new stem for the path.

        Returns:
            SpecPath: A new spec path with the stem changed in both
                the spec path and mounted path.
        """
        new_path = LocalPath.with_stem(self, stem)
        new_mounted = PanPath(str(self._mounted)).with_stem(stem)

        return SpecPath(new_path, mounted=new_mounted)

    def joinpath(self, *pathsegments) -> SpecPath:DOCS
        """Join path components to this path.

        Args:
            *pathsegments: The path segments to append to this path.

        Returns:
            SpecPath: A new spec path with the segments appended to both
                the spec path and mounted path.
        """
        new_path = LocalPath.joinpath(self, *pathsegments)
        new_mounted = PanPath(str(self._mounted)).joinpath(*pathsegments)

        return SpecPath(new_path, mounted=new_mounted)

    def __truediv__(self, key):DOCS
        """Implement the / operator for paths.

        Args:
            key: The path segment to append to this path.

        Returns:
            SpecPath: A new spec path with the segment appended.
        """
        # it was not implemented with .with_segments()
        return self.joinpath(key)

    @propertyDOCS
    def parent(self) -> SpecPath:
        """Get the parent directory of this path.

        Returns:
            SpecPath: A new spec path representing the parent directory
                of both the spec path and mounted path.
        """
        new_path = LocalPath.parent.fget(self)
        new_mounted = PanPath(str(self._mounted)).parent

        return SpecPath(new_path, mounted=new_mounted)


class SpecLocalPath(SpecPath, LocalPath):  # type: ignoreDOCS
    """A class to represent a spec local path

    This class represents a path in the local filesystem as it appears in the
    framework's environment, while maintaining a reference to its corresponding
    path in the remote execution environment.

    Attributes:
        _mounted: The corresponding path in the remote execution environment.

    Examples:
        >>> spec_path = SpecLocalPath("/local/data/file.txt",
        ...                         mounted="/container/data/file.txt")
        >>> str(spec_path)
        '/local/data/file.txt'
        >>> str(spec_path.mounted)
        '/container/data/file.txt'
        >>> spec_path.name
        'file.txt'
    """


class SpecCloudPath(SpecPath, CloudPath):DOCS
    """A class to represent a spec cloud path

    This class represents a cloud storage path as it appears in the local
    environment where the framework runs, while maintaining a reference to its
    corresponding path in the remote execution environment.

    Attributes:
        _mounted: The corresponding path in the remote execution environment.

    Examples:
        >>> spec_path = SpecPath("gs://bucket/file.txt",
        ...                    mounted="gs://container-bucket/file.txt")
        >>> str(spec_path)
        'gs://bucket/file.txt'
        >>> str(spec_path.mounted)
        'gs://container-bucket/file.txt'
    """

    def __fspath__(self) -> str:DOCS
        """Return the filesystem path representation.

        Returns:
            str: The filesystem path as a string.
        """
        cloud_fspath = os.getenv("XQUTE_CLOUD_FSPATH", DEFAULT_CLOUD_FSPATH)
        parts = [
            cloud_fspath,
            self.parts[0].replace(":", ""),
            *self.parts[1:],
        ]
        return os.path.join(*parts)

    async def get_fspath(self) -> str:DOCS
        """Get the corresponding local filesystem path and copy from cloud.

        Returns:
            PanPath: The path as it appears in the local filesystem.
        """
        p = PanPath(self.__fspath__())
        await p.parent.a_mkdir(parents=True, exist_ok=True)

        if await self.a_is_dir():
            await self.a_copytree(p)
        else:
            await self.a_copy(p)

        return str(p)


class SpecGSPath(SpecCloudPath, GSPath):DOCS
    """A class to represent a spec Google Cloud Storage path

    This class represents a Google Cloud Storage path as it appears in the
    local environment where the framework runs, while maintaining a reference
    to its corresponding path in the remote execution environment.

    Examples:
        >>> spec_path = SpecPath("gs://bucket/file.txt",
        ...                    mounted="gs://container-bucket/file.txt")
        >>> isinstance(spec_path, SpecGSPath)
        True
    """


class SpecAzurePath(SpecCloudPath, AzurePath):DOCS
    """A class to represent a spec Azure Blob Storage path

    This class represents an Azure Blob Storage path as it appears in the
    local environment where the framework runs, while maintaining a reference
    to its corresponding path in the remote execution environment.

    Examples:
        >>> spec_path = SpecPath("az://container/blob",
        ...                    mounted="az://remote-container/blob")
        >>> isinstance(spec_path, SpecAzurePath)
        True
    """


class SpecS3Path(SpecCloudPath, S3Path):DOCS
    """A class to represent a spec Amazon S3 path

    This class represents an Amazon S3 path as it appears in the
    local environment where the framework runs, while maintaining a reference
    to its corresponding path in the remote execution environment.

    Examples:
        >>> spec_path = SpecPath("s3://bucket/key",
        ...                    mounted="s3://remote-bucket/key")
        >>> isinstance(spec_path, SpecS3Path)
        True
    """