Process group
A process group is a collection of processes that are related to each other. It is a convenient way to manage a set of processes.
With pipen
, not only a process can be reused, but also a group of processes can be reused. We just need to define the relationship between the processes in the group, and then we can reuse the group in other pipelines, or even run it directly as a pipeline.
Define a process group
To define a process group, we need to define a class that inherits from pipen.procgroup.ProcGroup
. The class name will be the name of the group, unless we specify a name
attribute.
from pipen.procgroup import ProcGroup
class MyGroup(ProcGroup):
...
Note that the subclasses of ProcGroup
are singleton classes. If you need to define multiple groups, you can define a base class and then inherit from it.
Add processes to a group
There are two ways to add processes to a group, using pg.add_proc
or ProcGroup.add_proc
, where pg
is a process group instance. The first method is used after the group is instantiated and it decorates a process class directly. The second method is used before the group is instantiated and it decorates a property of ProcGroup
that returns a process.
-
Using the
pg.add_proc()
decorator.from pipen import Proc, ProcGroup class MyGroup(ProcGroup): ... pg = MyGroup() @pg.add_proc class MyProc(Proc): ...
-
Using the
ProcGroup.add_proc()
decorator to decorate a property of the group class.from pipen import Proc, ProcGroup class MyGroup(ProcGroup): @ProcGroup.add_proc def my_proc(self): class MyProc(Proc): ... return MyProc
This method adds a process at runtime, so it is useful when we want to add processes to a group dynamically.
Access processes in a group
We can access the processes in a group using the pg.<proc>
attribute, where pg
is a process group instance. Note that when we use the ProcGroup.add_proc
method to add processes, the process name is the name of the property.
However, you can always use pg.procs.<proc_name>
to access the process, where the <proc_name>
is the real name of the process.
from pipen import Proc, ProcGroup
class MyGroup(ProcGroup):
@ProcGroup.add_proc
def my_proc(self):
class MyProc(Proc):
...
return MyProc
pg = MyGroup()
assert pg.my_proc.name == 'MyProc'
assert pg.procs.MyProc.name == 'MyProc'
We can use pg.starts
to get the start processes of the group, which are the processes that have no required processes. So when you add processes to a group, remember to specify .requires
for each process, unless they are start processes.
Run a process group as a pipeline
To run a process group as a pipeline, we can convert it to a pipeline using the as_pipen()
method. The method takes the same arguments as the Pipen
constructor.
from pipen import Proc, ProcGroup
class MyGroup(ProcGroup):
...
pg = MyGroup()
@pg.add_proc
class MyProc(Proc):
...
pg.as_pipen().set_data(...).run()
Integrate a process group into a pipeline
from pipen import Proc, ProcGroup
class MyGroup(ProcGroup):
@ProcGroup.add_proc
def my_proc(self):
class MyProc(Proc):
...
return MyProc
@ProcGroup.add_proc
def my_proc2(self):
class MyProc2(Proc):
requires = self.my_proc
...
return MyProc2
pg = MyGroup()
class PrepareData(Proc):
...
class PostGroup(Proc):
requires = pg.my_proc2
pg.my_proc.requires = PrepareData
pipen = Pipen().set_starts(PrepareData).set_data(...).run()