The ZnFlow package provides a basic structure for building computational
graphs based on functions or classes. It is designed as a lightweight
abstraction layer to
- learn graph computing.
- build your own packages on top of it.
pip install znflowWith ZnFlow you can connect functions to each other by using the @nodify
decorator. Inside the znflow.DiGraph the decorator will return a
FunctionFuture object that can be used to connect the function to other nodes.
The FunctionFuture object will also be used to retrieve the result of the
function. Outside the znflow.DiGraph the function behaves as a normal
function.
import znflow
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
print(compute_mean(2, 8))
# >>> 5
with znflow.DiGraph() as graph:
mean = compute_mean(2, 8)
graph.run()
print(mean.result)
# >>> 5
with znflow.DiGraph() as graph:
n1 = compute_mean(2, 8)
n2 = compute_mean(13, 7)
n3 = compute_mean(n1, n2)
graph.run()
print(n3.result)
# >>> 7.5It is also possible to connect classes. They can be connected either directly or
via class attributes. This is possible by returning znflow.Connections inside
the znflow.DiGraph context manager. Outside the znflow.DiGraph the class
behaves as a normal class.
In the following example we use a dataclass, but it works with all Python
classes that inherit from znflow.Node.
import znflow
import dataclasses
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
@dataclasses.dataclass
class ComputeMean(znflow.Node):
x: float
y: float
results: float = None
def run(self):
self.results = (self.x + self.y) / 2
with znflow.DiGraph() as graph:
n1 = ComputeMean(2, 8)
n2 = compute_mean(13, 7)
# connecting classes and functions to a Node
n3 = ComputeMean(n1.results, n2)
graph.run()
print(n3.results)
# >>> 7.5ZnFlow comes with support for Dask to run your graph:
- in parallel.
- through e.g. SLURM (see https://jobqueue.dask.org/en/latest/api.html).
- with a nice GUI to track progress.
All you need to do is install ZnFlow with Dask pip install znflow[dask]. We
can then extend the example from above. This will run n1 and n2 in parallel.
You can investigate the graph on the Dask dashboard (typically
http://127.0.0.1:8787/graph or via the client object in Jupyter.)
import znflow
import dataclasses
from dask.distributed import Client
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
@dataclasses.dataclass
class ComputeMean(znflow.Node):
x: float
y: float
results: float = None
def run(self):
self.results = (self.x + self.y) / 2
client = Client()
deployment = znflow.deployment.DaskDeployment(client=client)
with znflow.DiGraph(deployment=deployment) as graph:
n1 = ComputeMean(2, 8)
n2 = compute_mean(13, 7)
# connecting classes and functions to a Node
n3 = ComputeMean(n1.results, n2)
graph.run()
print(n3)
# >>> ComputeMean(x=5.0, y=10.0, results=7.5)ZnFlow supports some special features for working with lists. In the following
example we want to combine two lists.
import znflow
@znflow.nodify
def arange(size: int) -> list:
return list(range(size))
print(arange(2) + arange(3))
>>> [0, 1, 0, 1, 2]
with znflow.DiGraph() as graph:
lst = arange(2) + arange(3)
graph.run()
print(lst.result)
>>> [0, 1, 0, 1, 2]This functionality is restricted to lists. There are some further features that
allow combining data: list[list] by either using
data: list = znflow.combine(data) which has an optional attribute=None
argument to be used in the case of classes or you can simply use
data: list = sum(data, []).
Inside the with znflow.DiGraph() context manager, accessing class attributes
yields znflow.Connector objects. Sometimes, it may be required to obtain the
actual attribute value instead of a znflow.Connector object. It is not
recommended to run class methods inside the with znflow.DiGraph() context
manager since it should be exclusively used for building the graph and not for
actual computation.
In the case of properties or other descriptor-based attributes, it might be
necessary to access the actual attribute value. This can be achieved using the
znflow.get_attribute method, which supports all features from getattr and
can be imported as such:
from znflow import get_attribute as getattrHere's an example of how to use znflow.get_attribute:
import znflow
class POW2(znflow.Node):
"""Compute the square of x."""
x_factor: float = 0.5
results: float = None
_x: float = None
@property
def x(self):
return self._x
@x.setter
def x(self, value):
# using "self._x = value * self.x_factor" inside "znflow.DiGraph()" would run
# "value * Connector(self, "x_factor")" which is not possible (TypeError)
# therefore we use znflow.get_attribute.
self._x = value * znflow.get_attribute(self, "x_factor")
def run(self):
self.results = self.x**2
with znflow.DiGraph() as graph:
n1 = POW2()
n1.x = 4.0
graph.run()
assert n1.results == 4.0Instead, you can also use the znflow.disable_graph decorator / context manager
to disable the graph for a specific block of code or the znflow.Property as a
drop-in replacement for property.
It is possible to create groups of znflow.nodify or znflow.Nodes independent
from the graph structure. To create a group you can use
with graph.group(<name>). To access the group members, use
graph.get_group(<name>) -> znflow.Group.
import znflow
@znflow.nodify
def compute_mean(x, y):
return (x + y) / 2
graph = znflow.DiGraph()
with graph.group("grp1"):
n1 = compute_mean(2, 4)
assert n1.uuid in graph.get_group("grp1")ZnFlow includes tests to ensure compatibility with:
- "Plain classes"
dataclassesZnInitattrspydantic(experimental)