Note
Go to the end to download the full example code.
Run tasks in parallel
Introduction
Once you have developed a correct and functioning workflow, the next step is often to scale it up for large datasets. This typically involves applying the same workflow to many independent data points. In this how-to, we show how to run the workflow in parallel for each data point to improve performance and scalability.
Setting up the AiiDA environment
from aiida import load_profile
load_profile()
from aiida_workgraph.utils import generate_node_graph
from aiida_workgraph import WorkGraph, task, Map
Perfectly parallelizable problem
A perfectly parallelizable problem can be broken down into smaller, independent subproblems that require no shared resources.
For example, consider an addition operation x + y
applied element-wise to two lists: [x₁, ..., xₙ]
and [y₁, ..., yₙ]
.
Each individual addition can be performed independently of the others.
WorkGraph
automatically parallelizes task execution when there are no data dependencies between tasks (for more details on this concept, refer to WorkGraph Engine).
We will take advanatge of this concept and create three different show three different ways how one can parallelize the add operation over the list with WorkGraph
.
Note
In practice, a simple element-wise addition like this would typically be parallelized at a lower level, such as using NumPy vectorization or multithreading. However, we use it here for illustrative purposes. The concepts demonstrated in this guide can be applied to any workflow that is perfectly parallelizable.
Conventional for-loop
@task
def add(x, y):
return x + y
len_list = 4
x_list = list(range(len_list))
y_list = list(range(len_list))
sums = []
wg = WorkGraph("parallel_for_loop")
for i in range(len_list):
add_task = wg.add_task(add, x=x_list[i], y=y_list[i])
sums.append(add_task.outputs.result)
wg.run()
print("Result:", sums)
# (1+1) + (2+2) + (3+3) = 12
assert sum(sum_socket.value for sum_socket in sums) == 12
Result: [SocketAny(name='result', value=uuid: 8354d2c5-703b-4ad9-8fcd-2a4fb634fc78 (pk: 697) value: 0), SocketAny(name='result', value=uuid: a0de8205-0612-4dd2-8f89-b20bd06b4b5a (pk: 701) value: 2), SocketAny(name='result', value=uuid: 4d00fd2f-205d-4c96-aab1-b09c8c45c017 (pk: 705) value: 4), SocketAny(name='result', value=uuid: c0192f32-2130-4662-b737-d7f3ab724aee (pk: 709) value: 6)]
Workflow view
wg.to_html()
Provenance graph
generate_node_graph(wg.pk)
Graph builder
We continue with creating the same task for the graph builder. We will perform the for loop within the graph builder task.
Note
We pack the two lists into a dictionary to pass the data to a task, because aiida-core
supports dynamically sized data structures only through dictionaries.
While lists are supported to some extent, their usage is limited to primitive types.
@task.graph()
def parallel_add_workflow(data):
result = {}
for i, item in enumerate(data.values()):
outputs = add(x=item["x"], y=item["y"])
result[f"sum_{i}"] = outputs.result
return {"result": result}
len_list = 4
data = {f"list_{i}": {"x": i, "y": i} for i in range(len_list)}
wg = WorkGraph("parallel_graph_task")
wg.add_task(parallel_add_workflow, data=data)
wg.outputs.result = wg.tasks.parallel_add_workflow.outputs.result
wg.run()
print("Result:", wg.outputs.result.value)
# (1+1) + (2+2) + (3+3) = 12
assert sum(wg.outputs.result.value.values()) == 12
Result: AttributeDict({'sum_0': <Int: uuid: ab125247-6d8f-4eb3-86e0-6c14b6163f40 (pk: 723) value: 0>, 'sum_1': <Int: uuid: 376103a2-993d-4f07-b90e-5888b2a09e4d (pk: 727) value: 2>, 'sum_2': <Int: uuid: eca69c87-4d5a-4e34-9e36-52609d0e026a (pk: 731) value: 4>, 'sum_3': <Int: uuid: 48647d74-361e-4a0e-8a47-c208737ce296 (pk: 735) value: 6>})
Workflow view
wg.to_html()
Provenance graph
generate_node_graph(wg.pk)
Map context manager
The Map
works similar as python’s inbuilt map.
By accessing the member item
of the map context we can directly pass the socket item to tasks passing creating for each element a new task behind the curtain.
There is a caveat, to apply the add operation we need to access the x
and y
elements in a separate task since we cannot run a task within a task.
The Map
context works similarly to Python’s built-in map
.
By accessing the item
member of the Map
context, we can pass each individual element (e.g. a dictionary entry) to tasks.
This creates a new task behind the scenes for each element.
Note
To perform an addition operation, we must extract the x and y values in separate tasks.
This is because tasks cannot be nested within other tasks, one of aiida-core
concepts to be able to strictly track created data.
@task
def get_value(data, key):
return data[key]
len_list = 4
data = {f"data_{i}": {"x": i, "y": i} for i in range(len_list)}
with WorkGraph("parallel_map") as wg:
with Map(data) as map_:
wg.outputs.result = add(
x=get_value(map_.item, "x").result, y=get_value(map_.item, "y").result
).result
wg.run()
print("Result:", wg.outputs.result.value)
# (1+1) + (2+2) + (3+3) = 12
assert sum(wg.outputs.result.value.values()) == 12
Result: AttributeDict({'data_0': <Int: uuid: 39c391a1-61c6-4199-8681-1e19bf4ba14f (pk: 752) value: 0>, 'data_1': <Int: uuid: 445d6f83-03ab-4f4e-ad51-9c8d3f7f9dbc (pk: 766) value: 2>, 'data_2': <Int: uuid: 61cc87a1-2bfa-4b6a-9768-f19d3384e9dd (pk: 780) value: 4>, 'data_3': <Int: uuid: 2b714eba-6444-4e9e-8569-0771c006184e (pk: 794) value: 6>})
Workflow view
wg.to_html()
Provenance graph
generate_node_graph(wg.pk)
Gather results
We now extend the workflow by adding a task that sums the intermediate results. This step is commonly known as a gather, aggregate, or reduce operation. It is often used to automatically analyze or summarize the output of the parallel computations.
Graph builder
We will extend it the whole workflow only by the aggregate_sum
task
@task
def aggregate_sum(data):
return sum(data.values())
@task.graph()
def parallel_add_workflow(data):
result = {}
for i, item in enumerate(data.values()):
outputs = add(x=item["x"], y=item["y"])
result[f"sum_{i}"] = outputs.result
return {"result": result}
len_list = 4
data = {f"list_{i}": {"x": i, "y": i} for i in range(len_list)}
wg = WorkGraph("parallel_graph_task")
wg.add_task(parallel_add_workflow, data=data)
wg.add_task(aggregate_sum, data=wg.tasks.parallel_add_workflow.outputs.result)
wg.outputs.result = wg.tasks.aggregate_sum.outputs.result
wg.run()
print("Result:", wg.outputs.result.value)
assert wg.outputs.result == 12
Result: uuid: 686cfbe3-c8ab-4b6d-8a6e-adcbcc2f4d08 (pk: 824) value: 12
Map context
Similarly for the map context approach we only need do extend it by the aggregate_sum
task
@task
def aggregate_sum(data):
return sum(data.values())
@task
def get_value(data, key):
return data[key]
len_list = 4
data = {f"data_{i}": {"x": i, "y": i} for i in range(len_list)}
with WorkGraph("parallel_map") as wg:
with Map(data) as map_:
added_numbers = add(
x=get_value(map_.item, "x").result, y=get_value(map_.item, "y").result
).result
wg.outputs.result = aggregate_sum(added_numbers).result
wg.run()
print("Result:", wg.outputs.result.value)
assert wg.outputs.result == 12
Result: uuid: dcc459be-18a7-4d40-ad73-17c6125255d6 (pk: 888) value: 12
Total running time of the script: (0 minutes 27.746 seconds)