Note
Go to the end to download the full example code.
WorkGraph
WorkGraph is a collection of tasks and links.
Create workgraph
First, create an empty workgraph:
from aiida_workgraph import WorkGraph, task
wg = WorkGraph(name="my_first_workgraph")
Define and use tasks
# Define a task:
@task()
def add(x, y):
return x + y
# Add tasks to the workgraph
add1 = wg.add_task(add, name="add1")
add2 = wg.add_task(add, name="add2")
Add a link between tasks:
wg.add_link(add1.outputs.result, add2.inputs.x)
# Visualize the graph
wg.to_html()
Execute the workgraph
With the graph defined, you can now execute it. You provide the inputs for the tasks.
from aiida import load_profile
load_profile()
wg.run(inputs={"add1": {"x": 1, "y": 2}, "add2": {"y": 3}})
Graph-level inputs and outputs
As workflows grow, managing inputs for many tasks can become cumbersome. WorkGraph allows you to define graph-level inputs and outputs to create a cleaner, more user-friendly interface for your complex logic.
This lets you:
Reuse a single input across multiple tasks.
Hide internal complexity and only expose essential inputs.
Collect and rename key results as named workflow outputs.
wg = WorkGraph("graph_inputs_outputs")
# Define graph-level input
wg.inputs.x = 2
# Add tasks using the graph-level input
wg.add_task(add, "add1", x=wg.inputs.x, y=3)
wg.add_task(add, "add2", x=wg.inputs.x, y=wg.tasks.add1.outputs.result)
# Define graph-level outputs to expose selected task results
wg.outputs.sum1 = wg.tasks.add1.outputs.result
wg.outputs.sum2 = wg.tasks.add2.outputs.result
# Run the WorkGraph
wg.run()
# Verify the final output
assert wg.outputs.sum2.value == 2 + (2 + 3)
# Visualize the graph with inputs and outputs
wg.to_html()
Context variables
Context variables (ctx) are used to store and pass intermediate data within a workflow that isn’t directly an input or output of a task. This is especially useful for workflows with conditional logic (if/else) or loops, where you need to manage state between steps.
wg = WorkGraph(name="context_example")
# Setting the ``ctx`` attribute of the WorkGraph directly, on initialization
wg.ctx = {"x": 2, "data.y": 3}
wg.add_task(add, "add1", x=wg.ctx.x, y=wg.ctx.data.y)
# Assign the result of a task to a context variable
wg.ctx.sum = wg.tasks.add1.outputs.result
# Use the context variable in another task
wg.add_task(add, "add2", x=wg.ctx.x, y=wg.ctx.sum)
DynamicNode(name='add2', properties=[], inputs=['x', 'y', '_wait', 'metadata', 'function_data', 'process_label', 'function_inputs', 'deserializers', 'serializers'], outputs=['result', '_wait', '_outputs', 'exit_code'])
Context variables can be nested, allowing you to organize complex data structures. For example, you can store multiple results in a structured way:
wg.ctx.data = {
"sum1": wg.tasks.add1.outputs.result,
"sum2": wg.tasks.add2.outputs.result,
}
WorkGraph engine
The WorkGraph engine operates on a dataflow programming model. Once submitted, the engine continuously monitors the tasks in the graph. A task is executed only when all of its inputs are available. This means:
Tasks with no inputs are executed first.
A task starts only after all the upstream tasks linked to its inputs are finished.
Total running time of the script: (0 minutes 2.818 seconds)