Cloud support
Since v0.16.0
, pipen
supports the cloud naively. There are two ways by means of cloud support:
- Run the pipeline locally (or schedulers like
sge
,slurm
, etc.) and save the files to the cloud. - Run the pipeline on the cloud.
Run the pipeline locally and save the files to the cloud
To run the pipeline locally and save the files to the cloud, you need to install pipen
with cloud support:
pip install xqute[cloudsh]
# To support a specific cloud service provider
pip install cloudpathlib[s3]
pip install cloudpathlib[gs]
pip install cloudpathlib[azure]
The you can directly assign a cloud path as a pipeline working directory:
from pipen import Pipen, Proc, run
class P1(Proc):
"""Sort input file"""
input = "in:var"
input_data = ["Hello World"]
output = "outfile:file:out.txt"
# Note that out.outfile is on the cloud but the script is executed locally
# we can use cloudsh to save the output to the cloud
script = "echo {{in.in}} | cloudsh sink {{out.outfile}}"
class MyPipeline(Pipen):
starts = P1
workdir = "gs://mybucket/mypipeline/workdir"
output = "gs://mybucket/mypipeline/output"
if __name__ == "__main__":
MyPipeline().run()
Like the following figure, the pipeline is run locally but the meta information is grabbed from and saved to the cloud (workdir). No local files are generated.
For the output files, if a process is a non-export process, the output files are saved to the workdir. If a process is an export process, the output files are saved to the output directory (export dir).
Run the pipeline on the cloud
Currently, pipen
only supports running the pipeline on the cloud with google batch jobs.
To run the pipeline on the cloud, you need to install pipen
with cloud support:
pip install xqute[gs]
It is used to communicate with google cloud storage files. No cloudsh
is needed, since operating the cloud files will be happening on the cloud (with the cloud paths mounted to the VM). You also need to have google cloud sdk installed and configured, which is used to communicate with google batch jobs (submit jobs, get job status, etc.).
from pipen import Pipen, Proc, run
class P1(Proc):
"""Sort input file"""
input = "in:var"
input_data = ["Hello World"]
output = "outfile:file:out.txt"
# Note that out.outfile is on the cloud but the script is executed locally
# we can use cloudsh to save the output to the cloud
script = "echo {{in.in}} | cloudsh sink {{out.outfile}}"
class MyPipeline(Pipen):
starts = P1
workdir = "gs://mybucket/mypipeline/workdir"
output = "gs://mybucket/mypipeline/output"
scheduler = "gbatch"
if __name__ == "__main__":
MyPipeline().run()
The only difference is that we need to set scheduler
to gbatch
(google batch jobs).
As shown in the following figure, the pipeline is run on the cloud platform, and the workdir and export dir will be mounted to the VM. So the process script can directly access the cloud files, no cloudsh
or gcloud
tools are needed.