Note
Go to the end to download the full example code.
Write error-resistant workflows
Introduction
In this how-to, we will show how to implement custom error handler functions that respond to exit codes with WorkGraph
, allowing you to automatically recover from errors, or to gracefully exit.
We will walk through how to create error handlers that execute specific tasks based on the exit codes of CalcJob
calculations.
Error handling functionality requires using nodegraph programming.
If you are unfamiliar with this approach, we recommend reviewing the node graph programming section first.
We exemplify the error handling with the ArithmeticAddCalculation
from aiida-core
, but the methods we learn in this section can be applied to any CalcJob
, including also ShellJob
and PythonJob
.
Load the AiiDA environment
from aiida_workgraph import WorkGraph, Task
from aiida import load_profile, orm
from aiida.common.exceptions import NotExistent
from aiida.cmdline.utils.ascii_vis import format_call_graph
load_profile()
try:
bash_code = orm.load_code(
"bash@localhost"
) # The computer label can also be omitted here
except NotExistent:
bash_code = orm.InstalledCode(
label="bash",
computer=orm.load_computer("localhost"),
filepath_executable="/bin/bash",
default_calc_job_plugin="core.arithmetic.add",
).store()
Exit codes
In AiiDA we can create error handlers for exit codes that are defined in the CalcJob
specification.
If you are not familiar with the concept of exit codes in CalcJobs
s you might want to familiarize yourself in the aiida-core documentation
For example for the ArithmeticAddCalculation
we can the defined exit codes with
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
print(ArithmeticAddCalculation.exit_codes)
ExitCodesNamespace({'ERROR_UNSPECIFIED': ExitCode(status=1, message='The process has failed with an unspecified error.', invalidates_cache=True), 'ERROR_LEGACY_FAILURE': ExitCode(status=2, message='The process failed with legacy failure mode.', invalidates_cache=True), 'ERROR_INVALID_OUTPUT': ExitCode(status=320, message='The output file contains invalid output.', invalidates_cache=True), 'ERROR_MISSING_OUTPUT': ExitCode(status=11, message='The process did not register a required output.', invalidates_cache=True), 'ERROR_NO_RETRIEVED_FOLDER': ExitCode(status=100, message='The process did not have the required `retrieved` output.', invalidates_cache=True), 'ERROR_SCHEDULER_OUT_OF_MEMORY': ExitCode(status=110, message='The job ran out of memory.', invalidates_cache=True), 'ERROR_SCHEDULER_OUT_OF_WALLTIME': ExitCode(status=120, message='The job ran out of walltime.', invalidates_cache=True), 'ERROR_SCHEDULER_INVALID_ACCOUNT': ExitCode(status=131, message='The specified account is invalid.', invalidates_cache=True), 'ERROR_SCHEDULER_NODE_FAILURE': ExitCode(status=140, message='The node running the job failed.', invalidates_cache=True), 'STOPPED_BY_MONITOR': ExitCode(status=150, message='{message}', invalidates_cache=True), 'ERROR_READING_OUTPUT_FILE': ExitCode(status=310, message='The output file could not be read.', invalidates_cache=True), 'ERROR_NEGATIVE_NUMBER': ExitCode(status=410, message='The sum of the operands is a negative number.', invalidates_cache=False)})
We will write an error handler for the exit code 410 ERROR_NEGATIVE_NUMBER
.
print(ArithmeticAddCalculation.exit_codes.ERROR_NEGATIVE_NUMBER)
ExitCode(status=410, message='The sum of the operands is a negative number.', invalidates_cache=False)
If the computed sum of the inputs x and y is negative, the ArithmeticAddCalculation
fails with exit code 410.
We will run a calculation that exits with this error.
wg = WorkGraph("error_negative_number")
wg.add_task(ArithmeticAddCalculation, name="add", x=1, y=-6, code=bash_code)
wg.run()
print("Task finished OK?:", wg.tasks.add.process.is_finished_ok)
print("Exit code :", wg.tasks.add.process.exit_code)
print("Exit Message: :", wg.tasks.add.process.exit_message)
Task finished OK?: False
Exit code : ExitCode(status=410, message='The sum of the operands is a negative number.', invalidates_cache=False)
Exit Message: : The sum of the operands is a negative number.
We can confirm that the task fails with this exit code in the CLI.
print(format_call_graph(orm.load_node(wg.pk)))
WorkGraph<error_negative_number><923> Finished [302]
└── ArithmeticAddCalculation<926> Finished [410]
Error handling
To “register” a error handler for a WorkGraph, you simply define a function that takes the task
as its arguments, and attach it as the error_handler
of the WorkGraph
.
You can specify the tasks and their exit codes that should trigger the error handler, as well as the maximum number of retries for a task:
def handle_negative_sum(task: Task):
"""Handle the failure code 410 of the `ArithmeticAddCalculation`.
Simply make the inputs positive by taking the absolute value.
"""
# modify task inputs
task.set({"x": abs(task.inputs.x.value), "y": abs(task.inputs.y.value)})
wg = WorkGraph("handling_error_negative_number")
wg.add_task(ArithmeticAddCalculation, name="add", x=1, y=-6, code=bash_code)
# Adding error handler logic
wg.add_error_handler(
handle_negative_sum,
name="handle_negative_sum",
tasks={"add": {"exit_codes": [410], "max_retries": 5}},
)
wg.run()
print("Task finished OK?:", wg.tasks.add.process.is_finished_ok)
print("Exit code :", wg.tasks.add.process.exit_code)
print("Exit Message: :", wg.tasks.add.process.exit_message)
Task finished OK?: True
Exit code : None
Exit Message: : None
print(format_call_graph(orm.load_node(wg.pk)))
WorkGraph<handling_error_negative_number><930> Finished [0]
├── ArithmeticAddCalculation<933> Finished [410]
└── ArithmeticAddCalculation<939> Finished [0]
Parametrized error handlers
We can also pass custom parameters to the error handler.
For example, instead of simply making the inputs positive by taking the absolute value, we increment by an integer.
The integer increment
is a custom parameter of the error handler, which the user can specify when attaching the error handler to the WorkGraph.
def handle_negative_sum(task: Task, increment: int = 1):
"""Handle the failure code 410 of the `ArithmeticAddCalculation`.
Simply add an increment to the inputs.
"""
# modify task inputs
task.set(
{"x": task.inputs.x.value + increment, "y": task.inputs.y.value + increment}
)
wg = WorkGraph("handling_error_negative_number")
wg.add_task(ArithmeticAddCalculation, name="add", x=1, y=-6, code=bash_code)
# Adding error handler logic
wg.add_error_handler(
handle_negative_sum,
name="handle_negative_sum",
tasks={
"add": {
"exit_codes": [410],
"max_retries": 5, # Note that retrying 5 times results in executing 6 times
"kwargs": {"increment": 1},
}
},
)
wg.run()
print("Task finished OK?:", wg.tasks.add.process.is_finished_ok)
print("Exit code :", wg.tasks.add.process.exit_code)
print("Exit Message: :", wg.tasks.add.process.exit_message)
Task finished OK?: True
Exit code : None
Exit Message: : None
print(format_call_graph(orm.load_node(wg.pk)))
WorkGraph<handling_error_negative_number><943> Finished [0]
├── ArithmeticAddCalculation<946> Finished [410]
├── ArithmeticAddCalculation<952> Finished [410]
├── ArithmeticAddCalculation<958> Finished [410]
└── ArithmeticAddCalculation<964> Finished [0]
We can confirm that the task first fails again with a 410. Then the WorkGraph restarts the task with the new inputs, and it finishes successfully.
We can also update the arguments of the error handler.
Let us update the increment
argument to 3 and restart the WorkGraph
.
# reset workgraph to start from the beginning
wg.reset()
wg.error_handlers["handle_negative_sum"]["tasks"]["add"]["kwargs"]["increment"] = 3
wg.run()
In this case, it only needs one retry to finish successfully as adding two times 3 makes the y
parameter positive
print(format_call_graph(orm.load_node(wg.pk)))
WorkGraph<handling_error_negative_number><968> Finished [0]
├── ArithmeticAddCalculation<971> Finished [410]
└── ArithmeticAddCalculation<977> Finished [0]
Note
PythonJob
task allows the user to attach the error handler directly to the task.
Please check out the aiida-pythonjob documentation
Total running time of the script: (0 minutes 14.215 seconds)