# Retries and timeouts

Flyte provides robust error handling through configurable retry strategies and timeout controls.
These parameters help ensure task reliability and prevent resource waste from runaway processes.

## Retries

The `retries` parameter controls how many times a failed task should be retried before giving up.
A "retry" is any attempt after the initial attempt.
In other words, `retries=3` means the task may be attempted up to 4 times in total (1 initial + 3 retries).

The `retries` parameter can be configured in either the `@env.task` decorator or using `override` when invoking the task.
It cannot be configured in the `TaskEnvironment` definition.

The code for the examples below can be found on [GitHub](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py).

### Retry example

First we import the required modules and set up a task environment:

```
import random
from datetime import timedelta

import flyte

env = flyte.TaskEnvironment(name="my-env")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

Then we configure our task to retry up to 3 times if it fails (for a total of 4 attempts). We also define the driver task `main` that calls the `retry` task:

```
@env.task(retries=3)
async def retry() -> str:
    if random.random() < 0.7:  # 70% failure rate
        raise Exception("Task failed!")
    return "Success!"

@env.task
async def main() -> list[str]:
    results = []
    try:
        results.append(await retry())
    except Exception as e:
        results.append(f"Failed: {e}")
    try:
        results.append(await retry.override(retries=5)())
    except Exception as e:
        results.append(f"Failed: {e}")
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

Note that we call `retry` twice: first without any `override`, and then with an `override` to increase the retries to 5 (for a total of 6 attempts).

Finally, we configure flyte and invoke the `main` task:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

## Timeouts

The `timeout` parameter sets limits on how long a task can run, preventing resource waste from stuck processes.
It supports multiple formats for different use cases.

The `timeout` parameter can be configured in either the `@env.task` decorator or using `override` when invoking the task.
It cannot be configured in the `TaskEnvironment` definition.

The code for the example below can be found on [GitHub](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py).

### Timeout example

First, we import the required modules and set up a task environment:

```
import random
from datetime import timedelta
import asyncio

import flyte
from flyte import Timeout

env = flyte.TaskEnvironment(name="my-env")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Our first task sets a timeout using seconds as an integer:

```
@env.task(timeout=60)  # 60 seconds
async def timeout_seconds() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_seconds completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

We can also set a timeout using a `timedelta` object for more readable durations:

```
@env.task(timeout=timedelta(minutes=1))
async def timeout_timedelta() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_timedelta completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

You can also set separate timeouts for maximum execution time and maximum queue time using the `Timeout` class:

```
@env.task(timeout=Timeout(
    max_runtime=timedelta(minutes=1),      # Max execution time per attempt
    max_queued_time=timedelta(minutes=1)   # Max time in queue before starting
))
async def timeout_advanced() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

You can also combine retries and timeouts for resilience and resource control:

```
@env.task(
    retries=3,
    timeout=Timeout(
        max_runtime=timedelta(minutes=1),
        max_queued_time=timedelta(minutes=1)
    )
)
async def timeout_with_retry() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Here we specify:
- Up to 3 retry attempts.
- Each attempt times out after 1 minute.
- Task fails if queued for more than 1 minute.
- Total possible runtime: 1 minute queue + (1 minute × 3 attempts).

We define the `main` driver task that calls all the timeout tasks concurrently and returns their outputs as a list. The return value for failed tasks will indicate failure:

```
@env.task
async def main() -> list[str]:
    tasks = [
        timeout_seconds(),
        timeout_seconds.override(timeout=120)(),  # Override to 120 seconds
        timeout_timedelta(),
        timeout_advanced(),
        timeout_with_retry(),
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    output = []
    for r in results:
        if isinstance(r, Exception):
            output.append(f"Failed: {r}")
        else:
            output.append(r)
    return output
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Note that we also demonstrate overriding the timeout for `timeout_seconds` to 120 seconds when calling it.

Finally, we configure Flyte and invoke the `main` task:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Proper retry and timeout configuration ensures your Flyte workflows are both reliable and efficient, handling transient failures gracefully while preventing resource waste.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-configuration/retries-and-timeouts.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-configuration/retries-and-timeouts/
