# Error handling

One of the key features of Flyte 2 is the ability to recover from user-level errors in a workflow execution.
This includes out-of-memory errors and other exceptions.

In a distributed system with heterogeneous compute, certain types of errors are expected and even, in a sense, acceptable.
Flyte 2 recognizes this and allows you to handle them gracefully as part of your workflow logic.

This ability is a direct result of the fact that workflows are now written in regular Python,
giving you with all the power and flexibility of Python error handling.
Let's look at an example:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = ""
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment(name="fail", resources=flyte.Resources(cpu=1, memory="250Mi"))

@env.task
async def oomer(x: int):
    large_list = [0] * 100000000
    print(len(large_list))

@env.task
async def always_succeeds() -> int:
    await asyncio.sleep(1)
    return 42

@env.task
async def main() -> int:
    try:
        await oomer(2)
    except flyte.errors.OOMError as e:
        print(f"Failed with oom trying with more resources: {e}, of type {type(e)}, {e.code}")
        try:
            await oomer.override(resources=flyte.Resources(cpu=1, memory="1Gi"))(5)
        except flyte.errors.OOMError as e:
            print(f"Failed with OOM Again giving up: {e}, of type {type(e)}, {e.code}")
            raise e
    finally:
        await always_succeeds()

    return await always_succeeds()

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/error-handling/error_handling.py*
<!-- TODO:
Create better example.
-->

In this code, we do the following:

* Import the necessary modules
* Set up the task environment. Note that we define our task environment with a resource allocation of 1 CPU and 250 MiB of memory.
* Define two tasks: one that will intentionally cause an out-of-memory (OOM) error, and another that will always succeed.
* Define the main task (the top level workflow task) that will handle the failure recovery logic.

The top `try...catch` block attempts to run the `oomer` task with a parameter that is likely to cause an OOM error.
If the error occurs, it catches the [`flyte.errors.OOMError`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.errors/oomerror) and attempts to run the `oomer` task again with increased resources.

This type of dynamic error handling allows you to gracefully recover from user-level errors in your workflows.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/error-handling.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/error-handling/
