# Abort and cancel actions

When running complex workflows, you may need to stop actions that are no longer needed.
This can happen when one branch of your workflow makes others redundant, when a task fails and its siblings should not continue, or when you need to manually intervene in a running workflow.

Flyte provides three mechanisms for stopping actions:

- **Automatic cleanup**: When a root action completes, all its in-progress descendant actions are automatically aborted.
- **Programmatic cancellation**: Cancel specific `asyncio` tasks from within your workflow code.
- **External abort**: Stop individual actions via the CLI, the UI, or the API.

For background on runs and actions, see [Runs and actions](https://www.union.ai/docs/v2/union/user-guide/task-programming/core-concepts/runs-and-actions).

## Action lifetime

The lifetime of all actions in a [run](https://www.union.ai/docs/v2/union/user-guide/task-programming/core-concepts/runs-and-actions) is tied to the lifetime of the root action (the first task that was invoked).
When the root action exits—whether it succeeds, fails, or returns early—all in-progress descendant actions are automatically aborted and no new actions can be enqueued.

This means you don't need to manually clean up child actions. Flyte handles it for you.

Consider this example where `main` exits after 10 seconds, but it has spawned a `sleep_for` action that is set to run for 30 seconds:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "seconds = 30"
# ///

import asyncio

import flyte

env = flyte.TaskEnvironment(name="action_lifetime")

@env.task
async def do_something():
    print("Doing something")
    await asyncio.sleep(5)
    print("Finished doing something")

@env.task
async def sleep_for(seconds: int):
    print(f"Sleeping for {seconds} seconds")
    try:
        await asyncio.sleep(seconds)
        await do_something()
    except asyncio.CancelledError:
        print("sleep_for was cancelled")
        return
    print(f"Finished sleeping for {seconds} seconds")

@env.task
async def main(seconds: int):
    print("Starting main")
    asyncio.create_task(sleep_for(seconds))
    await asyncio.sleep(10)
    print("Main finished")

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, seconds=30)
    print(run.url)
    run.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/action_lifetime.py*

When `main` returns after 10 seconds, the `sleep_for` action (which still has 20 seconds remaining) is automatically aborted.
The `sleep_for` task receives an `asyncio.CancelledError`, giving it a chance to handle the cancellation gracefully.

## Canceling actions programmatically

As a workflow author, you can cancel specific in-progress actions by canceling their corresponding `asyncio` tasks.
This is useful in scenarios like hyperparameter optimization (HPO), where one action converges to the desired result and the remaining actions can be stopped to save compute.

To cancel actions programmatically:

1. Launch actions using `asyncio.create_task()` and retain references to the returned task objects.
2. When the desired condition is met, call `.cancel()` on the tasks you want to stop.

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 30, f = 10.0"
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment("cancel")

@env.task
async def sleepers(f: float, n: int):
    await asyncio.sleep(f)

@env.task
async def failing_task(f: float):
    raise ValueError("I will fail!")

@env.task
async def main(n: int, f: float):
    sleeping_tasks = []
    for i in range(n):
        sleeping_tasks.append(asyncio.create_task(sleepers(f, i)))

    await asyncio.sleep(f)
    try:
        await failing_task(f)
        await asyncio.gather(*sleeping_tasks)
    except flyte.errors.RuntimeUserError as e:
        if e.code == "ValueError":
            print(f"Received ValueError, canceling {len(sleeping_tasks)} sleeping tasks")
            for t in sleeping_tasks:
                t.cancel()
        return

if __name__ == "__main__":
    flyte.init_from_config()
    print(flyte.run(main, 30, 10.0))
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/cancel_tasks.py*

In this code:

* The `main` task launches 30 `sleepers` actions in parallel using `asyncio.create_task()`.
* It then calls `failing_task`, which raises a `ValueError`.
* The error is caught as a `flyte.errors.RuntimeUserError` (since user-raised exceptions are wrapped by Flyte).
* On catching the error, `main` cancels all sleeping tasks by calling `.cancel()` on each one, freeing their compute resources.

This pattern lets you react to runtime conditions and stop unnecessary work. For more on handling errors within workflows, see [Error handling](https://www.union.ai/docs/v2/union/user-guide/task-programming/abort-tasks/error-handling).

## External abort

Sometimes you need to stop an action manually, outside the workflow code itself. You can abort individual actions using the CLI, the UI, or the API.

When an action is externally aborted, the parent action that awaits it receives a [`flyte.errors.ActionAbortedError`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.errors/actionabortederror). You can catch this error to handle the abort gracefully.

### Aborting via the CLI

To abort a specific action:

```bash
flyte abort <run-name> <action-name>
```

Use `--project` and `--domain` to target a specific [project-domain pair](https://www.union.ai/docs/v2/union/user-guide/task-programming/projects-and-domains).
For all available options, see the [CLI reference](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-cli#flyte-abort).

### Handling external aborts

When using `asyncio.gather()` with `return_exceptions=True`, externally aborted actions return an `ActionAbortedError` instead of raising it. This lets you inspect results and handle aborts on a per-action basis:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 10, sleep_for = 30.0"
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment("external_abort")

@env.task
async def long_sleeper(sleep_for: float):
    await asyncio.sleep(sleep_for)

@env.task
async def main(n: int, sleep_for: float) -> str:
    coros = [long_sleeper(sleep_for) for _ in range(n)]
    results = await asyncio.gather(*coros, return_exceptions=True)
    for i, r in enumerate(results):
        if isinstance(r, flyte.errors.ActionAbortedError):
            print(f"Action [{i}] was externally aborted")
    return "Hello World!"

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, 10, 30.0)
    print(run.url)
    run.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/external_abort.py*

In this code:

* The `main` task launches 10 `long_sleeper` actions in parallel.
* If any action is externally aborted (via the CLI, the UI, or the API) while running, `asyncio.gather` captures the `ActionAbortedError` as a result instead of propagating it.
* The `main` task iterates over the results and logs which actions were aborted.
* Because the abort is handled, `main` can continue executing and return its result normally.

Without `return_exceptions=True`, an external abort would raise `ActionAbortedError` directly, which you can handle with a standard `try...except` block.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/abort-tasks.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/abort-tasks/
