# Configure tasks
> This bundle contains all pages in the Configure tasks section.
> Source: https://www.union.ai/docs/v2/union/user-guide/task-configuration/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration ===

# Configure tasks

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

As we saw in [**Quickstart**](https://www.union.ai/docs/v2/union/user-guide/quickstart/page.md), you can run any Python function as a task in Flyte just by decorating it with `@env.task`.

This allows you to run your Python code in a distributed manner, with each function running in its own container.
Flyte manages the spinning up of the containers, the execution of the code, and the passing of data between the tasks.

The simplest possible case is a `TaskEnvironment` with only a `name` parameter, and an `env.task` decorator, with no parameters:

```
env = flyte.TaskEnvironment(name="my_env")

@env.task
async def my_task(name:str) -> str:
    return f"Hello {name}!"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/task_config.py*

> [!NOTE]
> Notice how the `TaskEnvironment` is assigned to the variable `env` and then that variable is
> used in the `@env.task`. This is what connects the `TaskEnvironment` to the task definition.
>
> In the following we will often use `@env.task` generically to refer to the decorator,
> but it is important to remember that it is actually a decorator attached to a specific
> `TaskEnvironment` object, and the `env` part can be any variable name you like.

This will run your task in the default container environment with default settings.

But, of course, one of the key advantages of Flyte is the ability to control the software environment, hardware environment, and other execution parameters for each task, right in your Python code.
In this section we will explore the various configuration options available for tasks in Flyte.

## Task configuration levels

Task configuration is done at three levels. From most general to most specific, they are:

* The `TaskEnvironment` level: setting parameters when defining the `TaskEnvironment` object.
* The `@env.task` decorator level: Setting parameters in the `@env.task` decorator when defining a task function.
* The task invocation level: Using the `task.override()` method when invoking task execution.

Each level has its own set of parameters, and some parameters are shared across levels.
For shared parameters, the more specific level will override the more general one.

### Example

Here is an example of how these levels work together, showing each level with all available parameters:

```
# Level 1: TaskEnvironment - Base configuration
env_2 = flyte.TaskEnvironment(
    name="data_processing_env",
    image=flyte.Image.from_debian_base(),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    env_vars={"MY_VAR": "value"},
    # secrets=flyte.Secret(key="openapi_key", as_env_var="MY_API_KEY"),
    cache="disable",
    # pod_template=my_pod_template,
    # reusable=flyte.ReusePolicy(replicas=2, idle_ttl=300),
    depends_on=[another_env],
    description="Data processing task environment",
    # plugin_config=my_plugin_config
)

# Level 2: Decorator - Override some environment settings
@env_2.task(
    short_name="process",
    # secrets=flyte.Secret(key="openapi_key", as_env_var="MY_API_KEY_2"),
    cache="auto",
    # pod_template=my_pod_template,
    report=True,
    max_inline_io_bytes=100 * 1024,
    retries=3,
    timeout=60,
    docs="This task processes data and generates a report."
)
async def process_data(data_path: str) -> str:
    return f"Processed {data_path}"

@env_2.task
async def invoke_process_data() -> str:
    result = await process_data.override(
        resources=flyte.Resources(cpu=4, memory="2Gi"),
        env_vars={"MY_VAR": "new_value"},
        # secrets=flyte.Secret(key="openapi_key", as_env_var="MY_API_KEY_3"),
        cache="auto",
        max_inline_io_bytes=100 * 1024,
        retries=3,
        timeout=60
    )("input.csv")
    return result
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/task_config.py*

### Parameter interaction

Here is an overview of all task configuration parameters available at each level and how they interact:

| Parameter               | `TaskEnvironment`  | `@env.task` decorator      | `override` on task invocation |
|-------------------------|--------------------|----------------------------|-------------------------------|
| **Configure tasks > Additional task settings**                           | ✅ Yes (required)  | ❌ No                      | ❌ No                         |
| **Configure tasks > Additional task settings**                     | ❌ No              | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Container images**                                  | ✅ Yes             | ❌ No                      | ❌ No                         |
| **Configure tasks > Resources**                                     | ✅ Yes             | ❌ No                      | ✅ Yes (if not `reusable`)    |
| **Configure tasks > Additional task settings** | ✅ Yes             | ❌ No                      | ✅ Yes (if not `reusable`)    |
| **Configure tasks > Secrets**                                         | ✅ Yes             | ❌ No                      | ✅ Yes (if not `reusable`)    |
| **Configure tasks > Caching**                                           | ✅ Yes             | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Pod templates**                              | ✅ Yes             | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Reusable containers**                            | ✅ Yes             | ❌ No                      | ✅ Yes                        |
| **Configure tasks > Multiple environments**                        | ✅ Yes             | ❌ No                      | ❌ No                         |
| **Configure tasks > Additional task settings**                    | ✅ Yes             | ❌ No                      | ❌ No                         |
| **Configure tasks > Task plugins**                              | ✅ Yes             | ❌ No                      | ❌ No                         |
| **Configure tasks > Additional task settings**                  | ❌ No              | ✅ Yes                     | ❌ No                         |
| **Configure tasks > Additional task settings** | ❌ No    | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Retries and timeouts**                            | ❌ No              | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Retries and timeouts**                            | ❌ No              | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Triggers**                                       | ❌ No              | ✅ Yes                     | ❌ No                         |
| **Configure tasks > Additional task settings**                    | ❌ No              | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Interruptible tasks and queues**            | ✅ Yes             | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Interruptible tasks and queues**                    | ✅ Yes             | ✅ Yes                     | ✅ Yes                        |
| **Configure tasks > Additional task settings**                      | ❌ No              | ✅ Yes                     | ❌ No                         |

## Task configuration parameters

Each parameter is documented in detail on its dedicated page or in the API reference. For full type signatures and constraints, see the [`TaskEnvironment` API reference](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/taskenvironment/page.md).

| Parameter | Details |
|-----------|---------|
| **name**, **short_name**, **description**, **docs** | **Configure tasks > Additional task settings** |
| **image** | **Configure tasks > Container images** &bull; [`Image` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/image/page.md) |
| **resources** | **Configure tasks > Resources** &bull; [`Resources` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/resources/page.md) |
| **env_vars** | **Configure tasks > Additional task settings** |
| **secrets** | **Configure tasks > Secrets** &bull; [`Secret` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/secret/page.md) |
| **cache** | **Configure tasks > Caching** &bull; [`Cache` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/cache/page.md) |
| **pod_template** | **Configure tasks > Pod templates** &bull; [`PodTemplate` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/podtemplate/page.md) |
| **reusable** | **Configure tasks > Reusable containers** &bull; [`ReusePolicy` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/reusepolicy/page.md) |
| **depends_on** | **Configure tasks > Multiple environments** |
| **plugin_config** | **Configure tasks > Task plugins** |
| **report** | **Configure tasks > Additional task settings** |
| **max_inline_io_bytes** | **Configure tasks > Additional task settings** |
| **retries**, **timeout** | **Configure tasks > Retries and timeouts** &bull; [`RetryStrategy`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/retrystrategy/page.md), [`Timeout`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/timeout/page.md) API refs |
| **triggers** | **Configure tasks > Triggers** &bull; [`Trigger` API ref](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/trigger/page.md) |
| **links** | **Configure tasks > Additional task settings** |
| **interruptible**, **queue** | **Configure tasks > Interruptible tasks and queues** |

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/container-images ===

# Container images

The `image` parameter of the [`TaskEnvironment`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/taskenvironment) is used to specify a container image.
Every task defined using that `TaskEnvironment` will run in a container based on that image.

If a `TaskEnvironment` does not specify an `image`, it will use the default Flyte image ([`ghcr.io/flyteorg/flyte:py{python-version}-v{flyte_version}`](https://github.com/orgs/flyteorg/packages/container/package/flyte)).

## Specifying your own image directly

You can directly reference an image by URL in the `image` parameter, like this:

```python
env = flyte.TaskEnvironment(
    name="my_task_env",
    image="docker.io/myorg/myimage:mytag"
)
```

This works well if you have a pre-built image available in a public registry like Docker Hub or in a private registry that your Union/Flyte instance can access.

## Specifying your own image with the `flyte.Image` object

You can also construct an image programmatically using the `flyte.Image` object.

The `flyte.Image` object provides a fluent interface for building container images: start with a `from_*` base constructor, then customize with `with_*` methods. Each method returns a new immutable `Image`.

For a complete list of all available methods and their parameters, see the [`Image` API reference](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/image).

Here are some examples of the most common patterns for building images with `flyte.Image`.

## Example: Defining a custom image with `Image.from_debian_base`

The `[[Image.from_debian_base()]]` provides the default Flyte image as the base.
This image is itself based on the official Python Docker image (specifically `python:{version}-slim-bookworm`) with the addition of the Flyte SDK pre-installed.
Starting there, you can layer additional features onto your image.
For example:

```python
import flyte
import numpy as np

# Define the task environment
env = flyte.TaskEnvironment(
    name="my_env",
    image = (
        flyte.Image.from_debian_base(
            name="my-image",
            python_version=(3, 13)
            # registry="registry.example.com/my-org" # Only needed for local builds
        )
        .with_apt_packages("libopenblas-dev")
        .with_pip_packages("numpy")
        .with_env_vars({"OMP_NUM_THREADS": "4"})
    )
)

@env.task
def main(x_list: list[int]) -> float:
    arr = np.array(x_list)
    return float(np.mean(arr))

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, x_list=list(range(10)))
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/container-images/from_debian_base.py*

> [!NOTE]
> The `registry` parameter is only needed if you are building the image locally. It is not required when using the Union backend `ImageBuilder`.
> See **Configure tasks > Container images > Image building** for more details.

> [!NOTE]
> Images built with `[[Image.from_debian_base()]]` do not include CA certificates by default, which can cause TLS
> validation errors and block access to HTTPS-based storage such as Amazon S3. Libraries like Polars (e.g., `polars.scan_parquet()`) are particularly affected.
> **Solution:** Add `"ca-certificates"` using `.with_apt_packages()` in your image definition.

## Example: Defining an image based on uv script metadata

Another common technique for defining an image is to use [`uv` inline script metadata](https://docs.astral.sh/uv/guides/scripts/#declaring-script-dependencies) to specify your dependencies right in your Python file and then use the `flyte.Image.from_uv_script()` method to create a `flyte.Image` object.
The `from_uv_script` method starts with the default Flyte image and adds the dependencies specified in the `uv` metadata.
For example:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "numpy"
# ]
# main = "main"
# params = "x_list=[1,2,3,4,5,6,7,8,9,10]"
# ///

import flyte
import numpy as np

env = flyte.TaskEnvironment(
    name="my_env",
    image=flyte.Image.from_uv_script(
            __file__,
            name="my-image"
            # registry="registry.example.com/my-org" # Only needed for local builds
        )
)

@env.task
def main(x_list: list[int]) -> float:
    arr = np.array(x_list)
    return float(np.mean(arr))

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, x_list=list(range(10)))
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/container-images/from_uv_script.py*

The advantage of this approach is that the dependencies used when running a script locally and when running it on the Flyte/Union backend are always the same (as long as you use `uv` to run your scripts locally).
This means you can develop and test your scripts in a consistent environment, reducing the chances of encountering issues when deploying to the backend.

In the above example you can see how to use `flyte.init_from_config()` for remote runs and `flyte.init()` for local runs.
Uncomment the `flyte.init()` line (and comment out `flyte.init_from_config()`) to enable local runs.
Do the opposite to enable remote runs.

> [!NOTE]
> When using `uv` metadata in this way, be sure to include the `flyte` package in your `uv` script dependencies.
> This will ensure that `flyte` is installed when running the script locally using `uv run`.
> When running on the Flyte/Union backend, the `flyte` package from the uv script dependencies will overwrite the one included automatically from the default Flyte image.

## Image building

There are two ways that the image can be built:

* If you are running a Flyte OSS instance then the image will be built locally on your machine and pushed to the container registry you specified in the `Image` definition.
* If you are running a Union instance, the image can be built locally, as with Flyte OSS, or using the Union `ImageBuilder`, which runs remotely on Union's infrastructure.

### Configuring the `builder`

[Earlier](https://www.union.ai/docs/v2/union/user-guide/connecting-to-a-cluster), we discussed the `image.builder` property in the `config.yaml`.

For Flyte OSS instances, this property must be set to `local`.

For Union instances, this property can be set to `remote` to use the Union `ImageBuilder`, or `local` to build the image locally on your machine.

### Local image building

When `image.builder` in the `config.yaml` is set to `local`, `flyte.run()` does the following:

* Builds the Docker image using your local Docker installation, installing the dependencies specified in the `uv` inline script metadata.
* Pushes the image to the container registry you specified.
* Deploys your code to the backend.
* Kicks off the execution of your workflow
* Before the task that uses your custom image is executed, the backend pulls the image from the registry to set up the container.

> [!NOTE]
> Above, we used `registry="ghcr.io/my_gh_org"`.
>
> Be sure to change `ghcr.io/my_gh_org` to the URL of your actual container registry.

You must ensure that:

* Docker is running on your local machine.
* You have successfully run `docker login` to that registry from your local machine (For example GitHub uses the syntax `echo $GITHUB_TOKEN | docker login ghcr.io -u USERNAME --password-stdin`)
* Your Union/Flyte installation has read access to that registry.

> [!NOTE]
> If you are using the GitHub container registry (`ghcr.io`)
> note that images pushed there are private by default.
> You may need to go to the image URI, click **Package Settings**, and change the visibility to public in order to access the image.
>
> Other registries (such as Docker Hub) require that you pre-create the image repository before pushing the image.
> In that case you can set it to public when you create it.
>
> Public images are on the public internet and should only be used for testing purposes.
> Do not place proprietary code in public images.

### Remote `ImageBuilder`

`ImageBuilder` is a service provided by Union that builds container images on Union's infrastructure and provides an internal container registry for storing the built images.

When `image.builder` in the `config.yaml` is set to `remote` (and you are running Union.ai), `flyte.run()` does the following:

* Builds the Docker image on your Union instance with `ImageBuilder`.
* Pushes the image to a registry
  * If you did not specify a `registry` in the `Image` definition, it pushes to the internal registry in your Union instance.
  * If you did specify a `registry`, it pushes to that registry. Be sure to also set the `registry_secret` parameter in the `Image` definition to enable `ImageBuilder` to authenticate to that registry (see **Configure tasks > Container images > Image building > Remote `ImageBuilder` > ImageBuilder with external registries**).
* Deploys your code to the backend.
* Kicks off the execution of your workflow.
* Before the task that uses your custom image is executed, the backend pulls the image from the registry to set up the container.

There is no set up of Docker nor any other local configuration required on your part.

> [!NOTE]
> The Flyte SDK checks whether the image builder is enabled for your cluster by verifying that the `image_build` task is deployed in the `system` project within the `production` domain.
> If you are using custom roles and policies, ensure that users are granted the `view_flyte_inventory` action for the `production/system` project-domain pair.
> See the [V1 user management documentation](/docs/v1/union//user-guide/administration/user-management) for more details on creating and assigning custom roles and policies (V2 user management currently works identically to V1).

#### ImageBuilder with external registries

If you are want to push the images built by `ImageBuilder` to an external registry, you can do this by setting the `registry` parameter in the `Image` object.
You will also need to set the `registry_secret` parameter to provide the secret needed to push and pull images to the private registry.
For example:

```python
# Add registry credentials so the Union remote builder can pull the base image
# and push the resulting image to your private registry.
image=flyte.Image.from_debian_base(
    name="my-image",
    base_image="registry.example.com/my-org/my-private-image:latest",
    registry="registry.example.com/my-org"
    registry_secret="my-secret"
)

# Reference the same secret in the TaskEnvironment so Flyte can pull the image at runtime.
env = flyte.TaskEnvironment(
    name="my_task_env",
    image=image,
    secrets="my-secret"
)
```

The value of the `registry_secret` parameter must be the name of a Flyte secret of type `image_pull` that contains the credentials needed to access the private registry. It must match the name specified in the `secrets` parameter of the `TaskEnvironment` so that Flyte can use it to pull the image at runtime.

To create an `image_pull` secret for the remote builder and the task environment, run the following command:

```bash
flyte create secret --type image_pull my-secret --from-file ~/.docker/config.json
```

The format of this secret matches the standard Kubernetes [image pull secret](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/#log-in-to-docker-hub), and should look like this:

```json
{
  "auths": {
    "registry.example.com": {
      "auth": "base64-encoded-auth"
    }
  }
}
```

> [!NOTE]
> The `auth` field contains the base64-encoded credentials for your registry (username and password or token).

### Install private PyPI packages

To install Python packages from a private PyPI index (for example, from GitHub), you can mount a secret to the image layer.
This allows your build to authenticate securely during dependency installation.
For example:

```python
private_package = "git+https://$GITHUB_PAT@github.com/pingsutw/flytex.git@2e20a2acebfc3877d84af643fdd768edea41d533"
image = (
    Image.from_debian_base()
    .with_apt_packages("git")
    .with_pip_packages(private_package, pre=True, secret_mounts=Secret("GITHUB_PAT"))
)
```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/resources ===

# Resources

Task resources specify the computational limits and requests (CPU, memory, GPU, storage) that will be allocated to each task's container during execution.

To specify resource requirements for your task, instantiate a `Resources` object with the desired parameters and assign it to either
the `resources` parameter of the `TaskEnvironment` or the `resources` parameter of the `override` function (for invocation overrides).

Every task defined using that `TaskEnvironment` will run with the specified resources.
If a specific task has its own `resources` defined in the decorator, it will override the environment's resources for that task only.

If neither `TaskEnvironment` nor the task decorator specifies `resources`, the default resource allocation will be used.

## Resources data class

For the full class definition, parameter types, and accepted formats, see the [`Resources` API reference](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/resources).

The main parameters are:

- **`cpu`**: CPU allocation — number, string (`"500m"`), or `(request, limit)` tuple.
- **`memory`**: Memory with Kubernetes units — `"4Gi"`, or `(request, limit)` tuple.
- **`gpu`**: GPU allocation — `"A100:2"`, integer count, or `GPU()`/`TPU()`/`Device()` for advanced config.
- **`disk`**: Ephemeral storage — `"10Gi"`.
- **`shm`**: Shared memory — `"1Gi"` or `"auto"`.

## Examples

### Usage in TaskEnvironment

Here's a complete example of defining a TaskEnvironment with resource specifications for a machine learning training workload:

```
import flyte

# Define a TaskEnvironment for ML training tasks
env = flyte.TaskEnvironment(
    name="ml-training",
    resources=flyte.Resources(
        cpu=("2", "4"),        # Request 2 cores, allow up to 4 cores for scaling
        memory=("2Gi", "12Gi"), # Request 2 GiB, allow up to 12 GiB for large datasets
        disk="50Gi",           # 50 GiB ephemeral storage for checkpoints
        shm="8Gi"              # 8 GiB shared memory for efficient data loading
    )
)

# Use the environment for tasks
@env.task
async def train_model(dataset_path: str) -> str:
    # This task will run with flexible resource allocation
    return "model trained"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/resources/resources.py*

### Usage in a task-specific override

```
# Demonstrate resource override at task invocation level
@env.task
async def heavy_training_task() -> str:
    return "heavy model trained with overridden resources"

@env.task
async def main():
    # Task using environment-level resources
    result = await train_model("data.csv")
    print(result)

    # Task with overridden resources at invocation time
    result = await heavy_training_task.override(
        resources=flyte.Resources(
            cpu="4",
            memory="24Gi",
            disk="100Gi",
            shm="16Gi"
        )
    )()
    print(result)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/resources/resources.py*

## Resource types

### CPU resources

CPU can be specified in several formats:

```python
# String formats (Kubernetes-style)
flyte.Resources(cpu="500m")        # 500 milliCPU (0.5 cores)
flyte.Resources(cpu="2")           # 2 CPU cores
flyte.Resources(cpu="1.5")         # 1.5 CPU cores

# Numeric formats
flyte.Resources(cpu=1)             # 1 CPU core
flyte.Resources(cpu=0.5)           # 0.5 CPU cores

# Request and limit ranges
flyte.Resources(cpu=("1", "2"))    # Request 1 core, limit to 2 cores
flyte.Resources(cpu=(1, 4))        # Request 1 core, limit to 4 cores
```

### Memory resources

Memory specifications follow Kubernetes conventions:

```python
# Standard memory units
flyte.Resources(memory="512Mi")    # 512 MiB
flyte.Resources(memory="1Gi")      # 1 GiB
flyte.Resources(memory="2Gi")      # 2 GiB
flyte.Resources(memory="500M")     # 500 MB (decimal)
flyte.Resources(memory="1G")       # 1 GB (decimal)

# Request and limit ranges
flyte.Resources(memory=("1Gi", "4Gi"))  # Request 1 GiB, limit to 4 GiB
```

### GPU resources

Flyte supports various GPU types and configurations:

#### Simple GPU allocation

```python
# Basic GPU count
flyte.Resources(gpu=1)             # 1 GPU (any available type)
flyte.Resources(gpu=4)             # 4 GPUs

# Specific GPU types with quantity
flyte.Resources(gpu="T4:1")        # 1 NVIDIA T4 GPU
flyte.Resources(gpu="A100:2")      # 2 NVIDIA A100 GPUs
flyte.Resources(gpu="H100:8")      # 8 NVIDIA H100 GPUs
```

#### Advanced GPU configuration

You can also use the `GPU` helper class for more detailed configurations:

```python
# Using the GPU helper function
gpu_config = flyte.GPU(device="A100", quantity=2)
flyte.Resources(gpu=gpu_config)

# GPU with memory partitioning (A100 only)
partitioned_gpu = flyte.GPU(
    device="A100",
    quantity=1,
    partition="1g.5gb"  # 1/7th of A100 with 5GB memory
)
flyte.Resources(gpu=partitioned_gpu)

# A100 80GB with partitioning
large_partition = flyte.GPU(
    device="A100 80G",
    quantity=1,
    partition="7g.80gb"  # Full A100 80GB
)
flyte.Resources(gpu=large_partition)
```

#### Supported GPU types
- **T4**: Entry-level training and inference
- **L4**: Optimized for AI inference
- **L40s**: High-performance compute
- **A100**: High-end training and inference (40GB)
- **A100 80G**: High-end training with more memory (80GB)
- **H100**: Latest generation, highest performance

### Custom device specifications

You can also define custom devices if your infrastructure supports them:

```python
# Custom device configuration
custom_device = flyte.Device(
    device="custom_accelerator",
    quantity=2,
    partition="large"
)

resources = flyte.Resources(gpu=custom_device)
```

### TPU resources

For Google Cloud TPU workloads you can specify TPU resources using the `TPU` helper class:

```python
# TPU v5p configuration
tpu_config = flyte.TPU(device="V5P", partition="2x2x1")
flyte.Resources(gpu=tpu_config)  # Note: TPUs use the gpu parameter

# TPU v6e configuration
tpu_v6e = flyte.TPU(device="V6E", partition="4x4")
flyte.Resources(gpu=tpu_v6e)
```

### Storage resources

Flyte provides two types of storage resources for tasks: ephemeral disk storage and shared memory.
These resources are essential for tasks that need temporary storage for processing data, caching intermediate results, or sharing data between processes.

#### Disk storage

Ephemeral disk storage provides temporary space for your tasks to store intermediate files, downloaded datasets, model checkpoints, and other temporary data. This storage is automatically cleaned up when the task completes.

```python
flyte.Resources(disk="10Gi")       # 10 GiB ephemeral storage
flyte.Resources(disk="100Gi")      # 100 GiB ephemeral storage
flyte.Resources(disk="1Ti")        # 1 TiB for large-scale data processing

# Common use cases
flyte.Resources(disk="50Gi")       # ML model training with checkpoints
flyte.Resources(disk="200Gi")      # Large dataset preprocessing
flyte.Resources(disk="500Gi")      # Video/image processing workflows
```

#### Shared memory

Shared memory (`/dev/shm`) is a high-performance, RAM-based storage area that can be shared between processes within the same container. It's particularly useful for machine learning workflows that need fast data loading and inter-process communication.

```python
flyte.Resources(shm="1Gi")         # 1 GiB shared memory (/dev/shm)
flyte.Resources(shm="auto")        # Auto-sized shared memory
flyte.Resources(shm="16Gi")        # Large shared memory for distributed training
```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/secrets ===

# Secrets

Flyte secrets enable you to securely store and manage sensitive information, such as API keys, passwords, and other credentials.
Secrets reside in a secret store on the data plane of your Union/Flyte backend.
You can create, list, and delete secrets in the store using the Flyte CLI or SDK.
Secrets in the store can be accessed and used within your workflow tasks, without exposing any cleartext values in your code.

## Creating a literal string secret

You can create a secret using the [`flyte create secret`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) command like this:

```bash
flyte create secret MY_SECRET_KEY my_secret_value
```

This will create a secret called `MY_SECRET_KEY` with the value `my_secret_value`.
This secret will be scoped to your entire organization.
It will be available across all projects and domains in your organization.
See the **Configure tasks > Secrets > Scoping secrets** section below for more details.
See **Configure tasks > Secrets > Using a literal string secret** for how to access the secret in your task code.

## Creating a file secret

You can also create a secret by specifying a local file:

```bash
flyte create secret MY_SECRET_KEY --from-file /local/path/to/my_secret_file
```

In this case, when accessing the secret in your task code, you will need to **Configure tasks > Secrets > Using a file secret**.

## Scoping secrets

When you create a secret without specifying a project or domain, as we did above, the secret is scoped to the organization level.
This means that the secret will be available across all projects and domains in the organization.

You can optionally specify either or both of the `--project` and `--domain` flags to restrict the scope of the secret to:
* A specific project (across all domains)
* A specific domain (across all project)
* A specific project and a specific domain.

For example, to create a secret that it is only available in `my_project/development`, you would execute the following command:

```bash
flyte create secret  --project my_project --domain development MY_SECRET_KEY my_secret_value
```

## Listing secrets

You can list existing secrets with the [`flyte get secret`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) command.
For example, the following command will list all secrets in the organization:

```bash
flyte get secret
```

Specifying either or both of the `--project` and `--domain` flags will list the secrets that are **only** available in that project and/or domain.

For example, to list the secrets that are only available in `my_project` and domain `development`, you would run:

```bash
flyte get secret --project my_project --domain development
```

## Deleting secrets

To delete a secret, use the [`flyte delete secret`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) command:

```bash
flyte delete secret MY_SECRET_KEY
```

## Using a literal string secret

To use a literal string secret, specify it in the `TaskEnvironment` along with the name of the environment variable into which it will be injected.
You can then access it using `os.getenv()` in your task code.
For example:

```
env_1 = flyte.TaskEnvironment(
    name="env_1",
    secrets=[
        flyte.Secret(key="my_secret", as_env_var="MY_SECRET_ENV_VAR"),
    ]
)

@env_1.task
def task_1():
    my_secret_value = os.getenv("MY_SECRET_ENV_VAR")
    print(f"My secret value is: {my_secret_value}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/secrets/secrets.py*

## Using a file secret

To use a file secret, specify it in the `TaskEnvironment` along with the `mount="/etc/flyte/secrets"` argument (with that precise value).

The file will be mounted at `/etc/flyte/secrets/<SECRET_KEY>`.

For example:

```
env_2 = flyte.TaskEnvironment(
    name="env_2",
    secrets=[
        flyte.Secret(key="my_secret", mount="/etc/flyte/secrets"),
    ]
)

@env_2.task
def task_2():
    with open("/etc/flyte/secrets/my_secret", "r") as f:
        my_secret_file_content = f.read()
    print(f"My secret file content is: {my_secret_file_content}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/secrets/secrets.py*

> [!NOTE]
> Currently, to access a file secret you must specify a `mount` parameter value of `"/etc/flyte/secrets"`.
> This fixed path is the directory in which the secret file will be placed.
> The name of the secret file will be equal to the key of the secret.

> [!NOTE]
> A `TaskEnvironment` can only access a secret if the scope of the secret includes the project and domain where the `TaskEnvironment` is deployed.

> [!WARNING]
> Do not return secret values from tasks, as this will expose secrets to the control plane.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/caching ===

# Caching

Flyte 2 provides intelligent **task output caching** that automatically avoids redundant computation by reusing previously computed task results.

> [!NOTE]
> Caching works at the task level and caches complete task outputs.
> For function-level checkpointing and resumption *within tasks*, see [Traces](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces).

## Overview

By default, caching is disabled.

If caching is enabled for a task, then Flyte determines a **cache key** for the task.
The key is composed of the following:

* Final inputs: The set of inputs after removing any specified in the `ignored_inputs`.
* Task name: The fully-qualified name of the task.
* Interface hash: A hash of the task's input and output types.
* Cache version: The cache version string.

If the cache behavior is set to `"auto"`, the cache version is automatically generated using a hash of the task's source code (or according to the custom policy if one is specified).
If the cache behavior is set to `"override"`, the cache version can be specified explicitly using the `version_override` parameter.

When the task runs, Flyte checks if a cache entry exists for the key.
If found, the cached result is returned immediately instead of re-executing the task.

## Basic caching usage

Flyte 2 supports three main cache behaviors:

### `"auto"` - Automatic versioning

```
@env.task(cache=flyte.Cache(behavior="auto"))
async def auto_versioned_task(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

With `behavior="auto"`, the cache version is automatically generated based on the function's source code.
If you change the function implementation, the cache is automatically invalidated.

- **When to use**: Development and most production scenarios.
- **Cache invalidation**: Automatic when function code changes.
- **Benefits**: Zero-maintenance caching that "just works".

You can also use the direct string shorthand:

```
@env.task(cache="auto")
async def auto_versioned_task_2(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

### `"override"`

With `behavior="override"`, you can specify a custom cache key in the `version_override` parameter.
Since the cache key is fixed as part of the code, it can be manually changed when you need to invalidate the cache.

```
@env.task(cache=flyte.Cache(behavior="override", version_override="v1.2"))
async def manually_versioned_task(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

- **When to use**: When you need explicit control over cache invalidation.
- **Cache invalidation**: Manual, by changing `version_override`.
- **Benefits**: Stable caching across code changes that don't affect logic.

### `"disable"` - No caching

To explicitly disable caching, use the `"disable"` behavior.
**This is the default behavior.**

```
@env.task(cache=flyte.Cache(behavior="disable"))
async def always_fresh_task(data: str) -> str:
    return get_current_timestamp() + await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

- **When to use**: Non-deterministic functions, side effects, or always-fresh data.
- **Cache invalidation**: N/A - never cached.
- **Benefits**: Ensures execution every time.

You can also use the direct string shorthand:

```
@env.task(cache="disable")
async def always_fresh_task_2(data: str) -> str:
    return get_current_timestamp() + await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

## Advanced caching configuration

### Ignoring specific inputs

Sometimes you want to cache based on some inputs but not others:

```
@env.task(cache=flyte.Cache(behavior="auto", ignored_inputs=("debug_flag",)))
async def selective_caching(data: str, debug_flag: bool) -> str:
    if debug_flag:
        print(f"Debug: transforming {data}")
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

**This is useful for**:
- Debug flags that don't affect computation
- Logging levels or output formats
- Metadata that doesn't impact results

### Cache serialization

Cache serialization ensures that only one instance of a task runs at a time for identical inputs:

```
@env.task(cache=flyte.Cache(behavior="auto", serialize=True))
async def expensive_model_training(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

**When to use serialization**:
- Very expensive computations (model training, large data processing)
- Shared resources that shouldn't be accessed concurrently
- Operations where multiple parallel executions provide no benefit

**How it works**:
1. First execution acquires a reservation and runs normally.
2. Concurrent executions with identical inputs wait for the first to complete.
3. Once complete, all waiting executions receive the cached result.
4. If the running execution fails, another waiting execution takes over.

### Salt for cache key variation

Use `salt` to vary cache keys without changing function logic:

```
@env.task(cache=flyte.Cache(behavior="auto", salt="experiment_2024_q4"))
async def experimental_analysis(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

**`salt` is useful for**:
- A/B testing with identical code.
- Temporary cache namespaces for experiments.
- Environment-specific cache isolation.

## Cache policies

For details on implementing custom cache policies, see the [`CachePolicy` protocol](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/cachepolicy) and [`Cache` class](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/cache) API references.

For `behavior="auto"`, Flyte uses cache policies to generate version hashes.

### Function body policy (default)

The default `FunctionBodyPolicy` generates cache versions from the function's source code:

```
from flyte._cache import FunctionBodyPolicy

@env.task(cache=flyte.Cache(
    behavior="auto",
    policies=[FunctionBodyPolicy()]  # This is the default. Does not actually need to be specified.
))
async def code_sensitive_task(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

### Custom cache policies

You can implement custom cache policies by following the `CachePolicy` protocol:

```
from flyte._cache import CachePolicy

class DatasetVersionPolicy(CachePolicy):
    def get_version(self, salt: str, params) -> str:
        # Generate version based on custom logic
        dataset_version = get_dataset_version()
        return f"{salt}_{dataset_version}"

@env.task(cache=flyte.Cache(behavior="auto", policies=[DatasetVersionPolicy()]))
async def dataset_dependent_task(data: str) -> str:
    # Cache invalidated when dataset version changes
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

## Caching configuration at different levels

You can configure caching at three levels: `TaskEnvironment` definition, `@env.task` decorator, and task invocation.

### `TaskEnvironment` Level

You can configure caching at the `TaskEnvironment` level.
This will set the default cache behavior for all tasks defined using that environment.
For example:

```
cached_env = flyte.TaskEnvironment(
    name="cached_environment",
    cache=flyte.Cache(behavior="auto")  # Default for all tasks
)

@cached_env.task  # Inherits auto caching from environment
async def inherits_caching(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

### `@env.task` decorator level

By setting the cache parameter in the `@env.task` decorator, you can override the environment's default cache behavior for specific tasks:

```
@cached_env.task(cache=flyte.Cache(behavior="disable"))  # Override environment default
async def decorator_caching(data: str) -> str:
    return await transform_data(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

### `task.override` level

By setting the cache parameter in the `task.override` method, you can override the cache behavior for specific task invocations:

```
@env.task
async def override_caching_on_call(data: str) -> str:
    # Create an overridden version and call it
    overridden_task = inherits_caching.override(cache=flyte.Cache(behavior="disable"))
    return await overridden_task(data)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/caching/caching.py*

## Runtime cache control

You can also force cache invalidation for a specific run:

```python
# Disable caching for this specific execution
run = flyte.with_runcontext(overwrite_cache=True).run(my_cached_task, data="test")
```

## Project and domain cache isolation

Caches are automatically isolated by:
- **Project**: Tasks in different projects have separate cache namespaces.
- **Domain**: Development, staging, and production domains maintain separate caches.

## Local development caching

When running locally, Flyte maintains a local cache:

```python
# Local execution uses ~/.flyte/local-cache/
flyte.init()  # Local mode
result = flyte.run(my_cached_task, data="test")
```

Local cache behavior:
- Stored in `~/.flyte/local-cache/` directory
- No project/domain isolation (since running locally)
- Disabled by setting `FLYTE_LOCAL_CACHE_ENABLED=false`

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/reusable-containers ===

# Reusable containers

By default, each task execution in Flyte and Union runs in a fresh container instance that is created just for that execution and then discarded.
With reusable containers, the same container can be reused across multiple executions and tasks.
This approach reduces start up overhead and improves resource efficiency.

> [!NOTE]
> The reusable container feature is only available when running your Flyte code on a Union backend.

## How It Works

With reusable containers, the system maintains a pool of persistent containers that can handle multiple task executions.
When you configure a `TaskEnvironment` with a `ReusePolicy`, the system does the following:

1. Creates a pool of persistent containers.
2. Routes task executions to available container instances.
3. Manages container lifecycle with configurable timeouts.
4. Supports concurrent task execution within containers (for async tasks).
5. Preserves the Python execution environment across task executions, allowing you to maintain state through global variables.

## Basic Usage

> [!NOTE]
> The reusable containers feature currently requires a dedicated runtime library
> ([`unionai-reuse`](https://pypi.org/project/unionai-reuse/)) to be installed in the task image used by the reusable task.
> You can add this library to your task image using the `flyte.Image.with_pip_packages` method, as shown below.
> This library only needs to be added to the task image.
> It does not need to be installed in your local development environment.

Enable container reuse by adding a `ReusePolicy` to your `TaskEnvironment`:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n=500"
# ///

import flyte
from datetime import timedelta

# {{docs-fragment env}}
# Currently required to enable resuable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.10")

env = flyte.TaskEnvironment(
    name="reusable-env",
    resources=flyte.Resources(memory="1Gi", cpu="500m"),
    reusable=flyte.ReusePolicy(
        replicas=2,                           # Create 2 container instances
        concurrency=1,                        # Process 1 task per container at a time
        scaledown_ttl=timedelta(minutes=10),  # Individual containers shut down after 5 minutes of inactivity
        idle_ttl=timedelta(hours=1)           # Entire environment shuts down after 30 minutes of no tasks
    ),
    image=reusable_image  # Use the container image augmented with the unionai-reuse library.
)
# {{/docs-fragment env}}

@env.task
async def compute_task(x: int) -> int:
    return x * x

@env.task
async def main() -> list[int]:
    # These tasks will reuse containers from the pool
    results = []
    for i in range(10):
        result = await compute_task(i)
        results.append(result)
    return results

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/reusable-containers/reuse.py*

## `ReusePolicy` parameters

For detailed parameter documentation, including capacity math and lifecycle behavior, see the [`ReusePolicy` API reference](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/reusepolicy).

The `ReusePolicy` class controls how containers are managed in a reusable environment:

```python
flyte.ReusePolicy(
    replicas: typing.Union[int, typing.Tuple[int, int]],
    concurrency: int,
    scaledown_ttl: typing.Union[int, datetime.timedelta],
    idle_ttl: typing.Union[int, datetime.timedelta]
)
```

### `replicas`: Container pool size

Controls the number of container instances in the reusable pool:

- **Fixed size**: `replicas=3` Creates exactly 3 container instances. These 3 replicas will be shutdown after `idle_ttl` expires.
- **Auto-scaling**: `replicas=(2, 5)` Starts with 2 containers and can scale up to 5 based on demand.
  - If the task is running on 2 replicas and demand drops to zero then these 2 containers will be shutdown after `idle_ttl` expires.
  - If the task is running on 2 replicas and demand increases, new containers will be created up to the maximum of 5.
  - If the task is running on 5 replicas and demand drops, container 5 will be shutdown after `scaledown_ttl` expires.
  - If demand drops again, container 4 will be also shutdown after another period of `scaledown_ttl` expires.
- **Resource impact**: Each replica consumes the full resources defined in `TaskEnvironment.resources`.

```python
# Fixed pool size
fixed_pool_policy = flyte.ReusePolicy(
    replicas=3,
    concurrency=1,
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

# Auto-scaling pool
auto_scaling_policy = flyte.ReusePolicy(
    replicas=(1, 10),
    concurrency=1,
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)
```

### `concurrency`: Tasks per container

Controls how many tasks can execute simultaneously within a single container:

- **Default**: `concurrency=1` (one task per container at a time).
- **Higher concurrency**: `concurrency=5` allows 5 tasks to run simultaneously in each container.
- **Total capacity**: `replicas × concurrency` = maximum concurrent tasks across the entire pool.

```python
# Sequential processing (default)
sequential_policy = flyte.ReusePolicy(
    replicas=2,
    concurrency=1,  # One task per container
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)

# Concurrent processing
concurrent_policy = flyte.ReusePolicy(
    replicas=2,
    concurrency=5,  # 5 tasks per container = 10 total concurrent tasks
    scaledown_ttl=timedelta(minutes=10),
    idle_ttl=timedelta(hours=1)
)
```

### `idle_ttl` vs `scaledown_ttl`: Container lifecycle

These parameters work together to manage container lifecycle at different levels:

#### `idle_ttl`: Environment timeout

- **Scope**: Controls the entire reusable environment infrastructure.
- **Behavior**: When there are no active or queued tasks, the entire environment scales down after `idle_ttl` expires.
- **Purpose**: Manages the lifecycle of the entire container pool.
- **Typical values**: 1-2 hours, or `None` for always-on environments

#### `scaledown_ttl`: Individual container timeout

- **Scope**: Controls individual container instances.
- **Behavior**: When a container finishes a task and becomes inactive, it will be terminated after `scaledown_ttl` expires.
- **Purpose**: Prevents resource waste from inactive containers.
- **Typical values**: 5-30 minutes for most workloads.

```python
from datetime import timedelta

lifecycle_policy = flyte.ReusePolicy(
    replicas=3,
    concurrency=2,
    scaledown_ttl=timedelta(minutes=10),  # Individual containers shut down after 10 minutes of inactivity
    idle_ttl=timedelta(hours=1)         # Entire environment shuts down after 1 hour of no tasks
)
```

## Understanding parameter relationships

The four `ReusePolicy` parameters work together to control different aspects of container management:

```python
reuse_policy = flyte.ReusePolicy(
    replicas=4,                           # Infrastructure: How many containers?
    concurrency=3,                        # Throughput: How many tasks per container?
    scaledown_ttl=timedelta(minutes=10),  # Individual: When do idle containers shut down?
    idle_ttl=timedelta(hours=1)           # Environment: When does the whole pool shut down?
)
# Total capacity: 4 × 3 = 12 concurrent tasks
# Individual containers shut down after 10 minutes of inactivity
# Entire environment shuts down after 1 hour of no tasks
```

### Key relationships

- **Total throughput** = `replicas × concurrency`
- **Resource usage** = `replicas × TaskEnvironment.resources`
- **Cost efficiency**: Higher `concurrency` reduces container overhead, more `replicas` provides better isolation
- **Lifecycle management**:  `scaledown_ttl` manages individual containers, `idle_ttl` manages the environment

## Simple example

Here is a simple, but complete, example of reuse with concurrency

First, import the needed modules, set upf logging:

```
import asyncio
import logging

import flyte

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py*

Next, we set up the reusable task environment. Note that, currently, the image used for a reusable environment requires an extra package to be installed:

```
env = flyte.TaskEnvironment(
    name="reuse_concurrency",
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    reusable=flyte.ReusePolicy(
        replicas=2,
        idle_ttl=60,
        concurrency=100,
        scaledown_ttl=60,
    ),
    image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.10"),
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py*

Now, we define the `reuse_concurrency` task (the main driver task of the workflow) and the `noop` task that will be executed multiple times reusing the same containers:

```
@env.task
async def noop(x: int) -> int:
    logger.debug(f"Task noop: {x}")
    return x

@env.task
async def main(n: int = 50) -> int:
    coros = [noop(i) for i in range(n)]
    results = await asyncio.gather(*coros)
    return sum(results)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py*

Finally, we deploy and run the workflow programmatically, so all you have to do is execute `python reuse_concurrency.py` to see it in action:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, n=500)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/reusable-containers/reuse_concurrency.py*

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/pod-templates ===

# Pod templates

Flyte is built on Kubernetes and leverages its powerful container orchestration capabilities. A Kubernetes [pod](https://kubernetes.io/docs/concepts/workloads/pods/) is a group of one or more containers that share storage and network resources. While Flyte automatically runs your task code in a container, pod templates let you customize the entire pod specification for advanced use cases.

The `pod_template` parameter in `TaskEnvironment` allows you to:

- **Add sidecar containers**: Run metrics exporters, service proxies, or specialized services alongside your task
- **Mount volumes**: Attach persistent storage or cloud storage like GCS or S3
- **Configure metadata**: Set custom labels and annotations for monitoring, routing, or cluster policies
- **Manage resources**: Configure resource requests, limits, and affinities
- **Inject configuration**: Add secrets, environment variables, or config maps
- **Access private registries**: Specify image pull secrets

## How it works

When you define a pod template:

1. **Primary container**: Flyte automatically injects your task code into the container specified by `primary_container_name` (default: `"primary"`)
2. **Automatic monitoring**: Flyte watches the primary container and exits the entire pod when it completes
3. **Image handling**: The image for your task environment is built automatically by Flyte; images for sidecar containers must be provided by you
4. **Local execution**: When running locally, only the task code executes—additional containers are not started

## Requirements

To use pod templates, install the Kubernetes Python client:

```bash
pip install kubernetes
```

Or add it to your image dependencies:

```python
image = flyte.Image.from_debian_base().with_pip_packages("kubernetes")
```

## Basic usage

Here's a complete example showing how to configure labels, annotations, environment variables, and image pull secrets:

```
# /// script
# requires-python = "==3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "kubernetes"
# ]
# ///

import flyte
from kubernetes.client import (
    V1Container,
    V1EnvVar,
    V1LocalObjectReference,
    V1PodSpec,
)

# Create a custom pod template
pod_template = flyte.PodTemplate(
    primary_container_name="primary",           # Name of the main container
    labels={"lKeyA": "lValA"},                 # Custom pod labels
    annotations={"aKeyA": "aValA"},            # Custom pod annotations
    pod_spec=V1PodSpec(                        # Kubernetes pod specification
        containers=[
            V1Container(
                name="primary",
                env=[V1EnvVar(name="hello", value="world")]  # Environment variables
            )
        ],
        image_pull_secrets=[                   # Access to private registries
            V1LocalObjectReference(name="regcred-test")
        ],
    ),
)

# Use the pod template in a TaskEnvironment
env = flyte.TaskEnvironment(
    name="hello_world",
    pod_template=pod_template,                 # Apply the custom pod template
    image=flyte.Image.from_uv_script(__file__, name="flyte", pre=True),
)

@env.task
async def say_hello(data: str) -> str:
    return f"Hello {data}"

@env.task
async def say_hello_nested(data: str = "default string") -> str:
    return await say_hello(data=data)

if __name__ == "__main__":
    flyte.init_from_config()
    result = flyte.run(say_hello_nested, data="hello world")
    print(result.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/pod-templates/pod_template.py*

## PodTemplate parameters

The `PodTemplate` class provides the following parameters:

| Parameter | Type | Description |
|-----------|------|-------------|
| `primary_container_name` | `str` | Name of the container where task code runs (default: `"primary"`). Must match a container in `pod_spec`. |
| `pod_spec` | `V1PodSpec` | Kubernetes pod specification for configuring containers, volumes, security contexts, and more. |
| `labels` | `dict[str, str]` | Pod labels for organization and selection by Kubernetes selectors. |
| `annotations` | `dict[str, str]` | Pod annotations for metadata and integrations (doesn't affect scheduling). |

## Volume mounts

Pod templates are commonly used to mount volumes for persistent storage or cloud storage access:

```python
from kubernetes.client import (
    V1Container,
    V1PodSpec,
    V1Volume,
    V1VolumeMount,
    V1CSIVolumeSource,
)
import flyte

pod_template = flyte.PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                volume_mounts=[
                    V1VolumeMount(
                        name="data-volume",
                        mount_path="/mnt/data",
                        read_only=False,
                    )
                ],
            )
        ],
        volumes=[
            V1Volume(
                name="data-volume",
                csi=V1CSIVolumeSource(
                    driver="your-csi-driver",
                    volume_attributes={"key": "value"},
                ),
            )
        ],
    ),
)

env = flyte.TaskEnvironment(
    name="volume-example",
    pod_template=pod_template,
    image=flyte.Image.from_debian_base(),
)

@env.task
async def process_data() -> str:
    # Access mounted volume
    with open("/mnt/data/input.txt", "r") as f:
        data = f.read()
    return f"Processed {len(data)} bytes"
```

### GCS/S3 volume mounts

Mount cloud storage directly into your pod for efficient data access:

```python
from kubernetes.client import V1Container, V1PodSpec, V1Volume, V1VolumeMount, V1CSIVolumeSource
import flyte

# GCS example with CSI driver
pod_template = flyte.PodTemplate(
    primary_container_name="primary",
    annotations={
        "gke-gcsfuse/volumes": "true",
        "gke-gcsfuse/cpu-limit": "2",
        "gke-gcsfuse/memory-limit": "1Gi",
    },
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                volume_mounts=[V1VolumeMount(name="gcs", mount_path="/mnt/gcs")],
            )
        ],
        volumes=[
            V1Volume(
                name="gcs",
                csi=V1CSIVolumeSource(
                    driver="gcsfuse.csi.storage.gke.io",
                    volume_attributes={"bucketName": "my-bucket"},
                ),
            )
        ],
    ),
)
```

## Sidecar containers

Add sidecar containers to run alongside your task. Common use cases include:

- **Metrics exporters**: Prometheus, Datadog agents
- **Service proxies**: Istio, Linkerd sidecars
- **Data services**: Databases, caches, or specialized services like Nvidia NIMs

```python
from kubernetes.client import V1Container, V1PodSpec
import flyte

pod_template = flyte.PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            # Primary container (where your task code runs)
            V1Container(name="primary"),

            # Sidecar container
            V1Container(
                name="metrics-sidecar",
                image="prom/pushgateway:latest",
                ports=[{"containerPort": 9091}],
            ),
        ],
    ),
)

env = flyte.TaskEnvironment(
    name="sidecar-example",
    pod_template=pod_template,
    image=flyte.Image.from_debian_base().with_pip_packages("requests"),
)

@env.task
async def task_with_metrics() -> str:
    import requests

    # Send metrics to sidecar
    requests.post("http://localhost:9091/metrics", data="my_metric 42")

    # Your task logic
    return "Task completed with metrics"
```

## Image pull secrets

Configure private registry access:

```python
from kubernetes.client import V1Container, V1PodSpec, V1LocalObjectReference
import flyte

pod_template = flyte.PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[V1Container(name="primary")],
        image_pull_secrets=[V1LocalObjectReference(name="my-registry-secret")],
    ),
)
```

## Cluster-specific configuration

Pod templates are often used to configure Kubernetes-specific settings required by your cluster, even when not using multiple containers:

```python
import flyte

pod_template = flyte.PodTemplate(
    primary_container_name="primary",
    annotations={
        "iam.amazonaws.com/role": "my-task-role",  # AWS IAM role
        "cluster-autoscaler.kubernetes.io/safe-to-evict": "false",
    },
    labels={
        "cost-center": "ml-team",
        "project": "recommendations",
    },
)
```

## Important notes

1. **Local execution**: Pod templates only apply to remote execution. When running locally, only your task code executes.

2. **Image building**: Flyte automatically builds and manages the image for your task environment. Images for sidecar containers must be pre-built and available in a registry.

3. **Primary container**: Your task code is automatically injected into the container matching `primary_container_name`. This container must be defined in the `pod_spec.containers` list.

4. **Lifecycle management**: Flyte monitors the primary container and terminates the entire pod when it exits, ensuring sidecar containers don't run indefinitely.

## Best practices

1. **Start simple**: Begin with basic labels and annotations before adding complex sidecars
2. **Test locally first**: Verify your task logic works locally before adding pod customizations
3. **Use environment-specific templates**: Different environments (dev, staging, prod) may need different pod configurations
4. **Set resource limits**: Always set resource requests and limits for sidecars to prevent cluster issues
5. **Security**: Use image pull secrets and least-privilege service accounts

## Learn more

- [Kubernetes Pods Documentation](https://kubernetes.io/docs/concepts/workloads/pods/)
- [Kubernetes Python Client](https://github.com/kubernetes-client/python)
- [V1PodSpec Reference](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#podspec-v1-core)

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/multiple-environments ===

# Multiple environments

In many applications, different tasks within your workflow may require different configurations.
Flyte enables you to manage this complexity by allowing multiple environments within a single workflow.

Multiple environments are useful when:
- Different tasks in your workflow need different dependencies.
- Some tasks require specific CPU/GPU or memory configurations.
- A task requires a secret that other tasks do not (and you want to limit exposure of the secret value).
- You're integrating specialized tools that have conflicting requirements.

## Constraints on multiple environments

To use multiple environments in your workflow you define multiple `TaskEnvironment` instances, each with its own configuration, and then assign tasks to their respective environments.

There are, however, two additional constraints that you must take into account.
If `task_1` in environment `env_1` calls a `task_2` in environment `env_2`, then:

1. `env_1` must declare a deployment-time dependency on `env_2` in the `depends_on` parameter of `TaskEnvironment` that defines `env_1`.
2. The image used in the `TaskEnvironment` of `env_1` must include all dependencies of the module containing the `task_2` (unless `task_2` is invoked as a remote task).

<!-- TODO: Link to remote tasks when that page is live
2. The image used in the `TaskEnvironment` of `env_1` must include all dependencies of the module containing the `task_2` (unless [`task_2` is invoked as a remote task](https://www.union.ai/docs/v2/union/user-guide/task-programming/remote-tasks)).
-->

### Task `depends_on` constraints

The `depends_on` parameter in `TaskEnvironment` is used to provide deployment-time dependencies by establishing a relationship between one `TaskEnvironment` and another.
The system uses this information to determine which environments (and, specifically which images) need to be built in order to be able to run the code.

On `flyte run` (or `flyte deploy`), the system walks the tree defined by the `depends_on` relationships, starting with the environment of the task being invoked (or the environment being deployed, in the case of `flyte deploy`), and prepares each required environment.
Most importantly, it ensures that the container images need for all required environments are available (and if not, it builds them).

This deploy-time determination of what to build is important because it means that for any given `run` or `deploy`, only those environments that are actually required are built.
The alternative strategy of building all environments defined in the set of deployed code can lead to unnecessary and expensive builds, especially when iterating on code.

### Dependency inclusion constraints

When a parent task invokes a child task in a different environment, the container image of the parent task environment must include all dependencies used by the child task.
This is necessary because of the way task invocation works in Flyte:

- When a child task is invoked by function name, that function, necessarily, has to be imported into the parent tasks's Python environment.
- This results in all the dependencies of the child task function also being imported.
- But, nonetheless, the actual execution of the child task occurs in its own environment.

To avoid this requirement, you can invoke a task in another environment _remotely_.

<!-- TODO: Link to remote tasks when that page is live
To avoid this requirement, you can [invoke a task in another environment _remotely_](https://www.union.ai/docs/v2/union/user-guide/task-programming/remote-tasks).
-->

## Example

The following example is a (very) simple mock of an AlphaFold2 pipeline.
It demonstrates a workflow with three tasks, each in its own environment.

The example project looks like this:

```bash
├── msa/
│   ├── __init__.py
│   └── run.py
├── fold/
│   ├── __init__.py
│   └── run.py
├── __init__.py
└── main.py
```
(The source code for this example can be found here:[AlphaFold2 mock example](https://github.com/unionai/unionai-examples/tree/main/v2/user-guide/task-configuration/multiple-environments/af2))

In file `msa/run.py` we define the task `run_msa`, which mocks the multiple sequence alignment step of the process:

```python
import flyte
from flyte.io import File

MSA_PACKAGES = ["pytest"]

msa_image = flyte.Image.from_debian_base().with_pip_packages(*MSA_PACKAGES)

msa_env = flyte.TaskEnvironment(name="msa_env", image=msa_image)

@msa_env.task
def run_msa(x: str) -> File:
    f = File.new_remote()
    with f.open_sync("w") as fp:
        fp.write(x)
    return f
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/msa/run.py*

* A dedicated image (`msa_image`) is built using the `MSA_PACKAGES` dependency list, on top of the standard base image.
* A dedicated environment (`msa_env`) is defined for the task, using `msa_image`.
* The task is defined within the context of the `msa_env` environment.

In file `fold/run.py` we define the task `run_fold`, which mocks the fold step of the process:

```python
import flyte
from flyte.io import File

FOLD_PACKAGES = ["ruff"]

fold_image = flyte.Image.from_debian_base().with_pip_packages(*FOLD_PACKAGES)

fold_env = flyte.TaskEnvironment(name="fold_env", image=fold_image)

@fold_env.task
def run_fold(sequence: str, msa: File) -> list[str]:
    with msa.open_sync("r") as f:
        msa_content = f.read()
    return [msa_content, sequence]
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/fold/run.py*

* A dedicated image (`fold_image`) is built using the `FOLD_PACKAGES` dependency list, on top of the standard base image.
* A dedicated environment (`fold_env`) is defined for the task, using `fold_image`.
* The task is defined within the context of the `fold_env` environment.

Finally, in file `main.py` we define the task `main` that ties everything together into a workflow.

We import the required modules and functions:

```
import logging
import pathlib

from fold.run import fold_env, fold_image, run_fold
from msa.run import msa_env, MSA_PACKAGES, run_msa

import flyte
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/main.py*

Notice that we import
* The task functions that we will be calling: `run_fold` and `run_msa`.
* The environments of those tasks: `fold_env` and `msa_env`.
* The dependency list of the `run_msa` task: `MSA_PACKAGES`
* The image of the `run_fold` task: `fold_image`

We then assemble the image and the environment:

```
main_image = fold_image.with_pip_packages(*MSA_PACKAGES)

env = flyte.TaskEnvironment(
    name="multi_env",
    depends_on=[fold_env, msa_env],
    image=main_image,
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/main.py*

The image for the `main` task (`main_image`) is built by starting with `fold_image` (the image for the `run_fold` task) and adding `MSA_PACKAGES` (the dependency list for the `run_msa` task).
This ensures that `main_image` includes all dependencies needed by both the `run_fold` and `run_msa` tasks.

The environment for the `main` task is defined with:
* The image `main_image`. This ensures that the `main` task has all the dependencies it needs.
* A depends_on list that includes both `fold_env` and `msa_env`. This establishes the deploy-time dependencies on those environments.

Finally, we define the `main` task itself:

```
@env.task
def main(sequence: str) -> list[str]:
    """Given a sequence, outputs files containing the protein structure
    This requires model weights + gpus + large database on aws fsx lustre
    """
    print(f"Running AlphaFold2 for sequence: {sequence}")
    msa = run_msa(sequence)
    print(f"MSA result: {msa}, passing to fold task")
    results = run_fold(sequence, msa)
    print(f"Fold results: {results}")
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/main.py*

Here we call, in turn, the `run_msa` and `run_fold` tasks.
Since we call them directly rather than as remote tasks, we had to ensure that `main_image` includes all dependencies needed by both tasks.

<!-- TODO: Link to remote tasks when that page is live
Note that we call them directly, not as [remote tasks](https://www.union.ai/docs/v2/union/user-guide/task-programming/remote-tasks), which is why we had to ensure that `main_image` includes all dependencies needed by both tasks.
-->

The final piece of the puzzle is the `if __name__ == "__main__":` block that allows us to run the `main` task on the configured Flyte backend:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, "AAGGTTCCAA")
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/multiple-environments/af2/main.py*

Now you can run the workflow with:

```bash
python main.py
```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/retries-and-timeouts ===

# Retries and timeouts

Flyte provides robust error handling through configurable retry strategies and timeout controls.
These parameters help ensure task reliability and prevent resource waste from runaway processes.

## Retries

The `retries` parameter controls how many times a failed task should be retried before giving up.
A "retry" is any attempt after the initial attempt.
In other words, `retries=3` means the task may be attempted up to 4 times in total (1 initial + 3 retries).

The `retries` parameter can be configured in either the `@env.task` decorator or using `override` when invoking the task.
It cannot be configured in the `TaskEnvironment` definition.

The code for the examples below can be found on [GitHub](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py).

### Retry example

First we import the required modules and set up a task environment:

```
import random
from datetime import timedelta

import flyte

env = flyte.TaskEnvironment(name="my-env")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

Then we configure our task to retry up to 3 times if it fails (for a total of 4 attempts). We also define the driver task `main` that calls the `retry` task:

```
@env.task(retries=3)
async def retry() -> str:
    if random.random() < 0.7:  # 70% failure rate
        raise Exception("Task failed!")
    return "Success!"

@env.task
async def main() -> list[str]:
    results = []
    try:
        results.append(await retry())
    except Exception as e:
        results.append(f"Failed: {e}")
    try:
        results.append(await retry.override(retries=5)())
    except Exception as e:
        results.append(f"Failed: {e}")
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

Note that we call `retry` twice: first without any `override`, and then with an `override` to increase the retries to 5 (for a total of 6 attempts).

Finally, we configure flyte and invoke the `main` task:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/retries.py*

## Timeouts

The `timeout` parameter sets limits on how long a task can run, preventing resource waste from stuck processes.
It supports multiple formats for different use cases.

The `timeout` parameter can be configured in either the `@env.task` decorator or using `override` when invoking the task.
It cannot be configured in the `TaskEnvironment` definition.

The code for the example below can be found on [GitHub](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py).

### Timeout example

First, we import the required modules and set up a task environment:

```
import random
from datetime import timedelta
import asyncio

import flyte
from flyte import Timeout

env = flyte.TaskEnvironment(name="my-env")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Our first task sets a timeout using seconds as an integer:

```
@env.task(timeout=60)  # 60 seconds
async def timeout_seconds() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_seconds completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

We can also set a timeout using a `timedelta` object for more readable durations:

```
@env.task(timeout=timedelta(minutes=1))
async def timeout_timedelta() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_timedelta completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

You can also set separate timeouts for maximum execution time and maximum queue time using the `Timeout` class:

```
@env.task(timeout=Timeout(
    max_runtime=timedelta(minutes=1),      # Max execution time per attempt
    max_queued_time=timedelta(minutes=1)   # Max time in queue before starting
))
async def timeout_advanced() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

You can also combine retries and timeouts for resilience and resource control:

```
@env.task(
    retries=3,
    timeout=Timeout(
        max_runtime=timedelta(minutes=1),
        max_queued_time=timedelta(minutes=1)
    )
)
async def timeout_with_retry() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Here we specify:
- Up to 3 retry attempts.
- Each attempt times out after 1 minute.
- Task fails if queued for more than 1 minute.
- Total possible runtime: 1 minute queue + (1 minute × 3 attempts).

We define the `main` driver task that calls all the timeout tasks concurrently and returns their outputs as a list. The return value for failed tasks will indicate failure:

```
@env.task
async def main() -> list[str]:
    tasks = [
        timeout_seconds(),
        timeout_seconds.override(timeout=120)(),  # Override to 120 seconds
        timeout_timedelta(),
        timeout_advanced(),
        timeout_with_retry(),
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    output = []
    for r in results:
        if isinstance(r, Exception):
            output.append(f"Failed: {r}")
        else:
            output.append(r)
    return output
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Note that we also demonstrate overriding the timeout for `timeout_seconds` to 120 seconds when calling it.

Finally, we configure Flyte and invoke the `main` task:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/retries-and-timeouts/timeouts.py*

Proper retry and timeout configuration ensures your Flyte workflows are both reliable and efficient, handling transient failures gracefully while preventing resource waste.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/triggers ===

# Triggers

Triggers allow you to automate and parameterize an execution by scheduling its start time and providing overrides for its task inputs.

Currently, only **schedule triggers** are supported.
This type of trigger runs a task based on a Cron expression or a fixed-rate schedule.

Support is coming for other trigger types, such as:

* Webhook triggers: Hit an API endpoint to run your task.
* Artifact triggers: Run a task when a specific artifact is produced.

## Triggers are set in the task decorator

A trigger is created by setting the `triggers` parameter in the task decorator to a `flyte.Trigger` object or a list of such objects (triggers are not settable at the `TaskEnvironment` definition or `task.override` levels).

Here is a simple example:

```
import flyte
from datetime import datetime, timezone

env = flyte.TaskEnvironment(name="trigger_env")

@env.task(triggers=flyte.Trigger.hourly())  # Every hour
def hourly_task(trigger_time: datetime, x: int = 1) -> str:
    return f"Hourly example executed at {trigger_time.isoformat()} with x={x}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

Here we use a predefined schedule trigger to run the `hourly_task` every hour.
Other predefined triggers can be used similarly (see **Configure tasks > Triggers > Predefined schedule triggers** below).

If you want full control over the trigger behavior, you can define a trigger using the `flyte.Trigger` class directly.

## `flyte.Trigger`

For complete parameter documentation, see the [`Trigger`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/trigger), [`Cron`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/cron), and [`FixedRate`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/fixedrate) API references.

The `Trigger` class allows you to define custom triggers with full control over scheduling and execution behavior. It has the following signature:

```
flyte.Trigger(
    name,
    automation,
    description="",
    auto_activate=True,
    inputs=None,
    env_vars=None,
    interruptible=None,
    overwrite_cache=False,
    queue=None,
    labels=None,
    annotations=None
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Core Parameters

**`name: str`** (required)
The unique identifier for the trigger within your project/domain.

**`automation: Union[Cron, FixedRate]`** (required)
Defines when the trigger fires. Use `flyte.Cron("expression")` for Cron-based scheduling or `flyte.FixedRate(interval_minutes, start_time=start_time)` for fixed intervals.

### Configuration Parameters

**`description: str = ""`**
Human-readable description of the trigger's purpose.

**`auto_activate: bool = True`**
Whether the trigger should be automatically activated when deployed. Set to `False` to deploy inactive triggers that require manual activation.

**`inputs: Dict[str, Any] | None = None`**
Default parameter values for the task when triggered. Use `flyte.TriggerTime` as a value to inject the trigger execution timestamp into that parameter.

### Runtime Override Parameters

**`env_vars: Dict[str, str] | None = None`**
Environment variables to set for triggered executions, overriding the task's default environment variables.

**`interruptible: bool | None = None`**
Whether triggered executions can be interrupted (useful for cost optimization with spot/preemptible instances). Overrides the task's interruptible setting.

**`overwrite_cache: bool = False`**
Whether to bypass/overwrite task cache for triggered executions, ensuring fresh computation.

**`queue: str | None = None`**
Specific execution queue for triggered runs, overriding the task's default queue.

### Metadata Parameters

**`labels: Mapping[str, str] | None = None`**
Key-value labels for organizing and filtering triggers (e.g., team, component, priority).

**`annotations: Mapping[str, str] | None = None`**
Additional metadata, often used by infrastructure tools for compliance, monitoring, or cost tracking.

Here's a comprehensive example showing all parameters:

```
comprehensive_trigger = flyte.Trigger(
    name="monthly_financial_report",
    automation=flyte.Cron("0 6 1 * *", timezone="America/New_York"),
    description="Monthly financial report generation for executive team",
    auto_activate=True,
    inputs={
        "report_date": flyte.TriggerTime,
        "report_type": "executive_summary",
        "include_forecasts": True
    },
    env_vars={
        "REPORT_OUTPUT_FORMAT": "PDF",
        "EMAIL_NOTIFICATIONS": "true"
    },
    interruptible=False,  # Critical report, use dedicated resources
    overwrite_cache=True,  # Always fresh data
    queue="financial-reports",
    labels={
        "team": "finance",
        "criticality": "high",
        "automation": "scheduled"
    },
    annotations={
        "compliance.company.com/sox-required": "true",
        "backup.company.com/retain-days": "2555"  # 7 years
    }
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

## The `automation` parameter with `flyte.FixedRate`

You can define a fixed-rate schedule trigger by setting the `automation` parameter of the `flyte.Trigger` to an instance of `flyte.FixedRate`.

The `flyte.FixedRate` has the following signature:

```
flyte.FixedRate(
    interval_minutes,
    start_time=None
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Parameters

**`interval_minutes: int`** (required)
The interval between trigger executions in minutes.

**`start_time: datetime | None`**
When to start the fixed rate schedule. If not specified, starts when the trigger is deployed and activated.

### Examples

```
# Every 90 minutes, starting when deployed
every_90_min = flyte.Trigger(
    "data_processing",
    flyte.FixedRate(interval_minutes=90)
)

# Every 6 hours (360 minutes), starting at a specific time
specific_start = flyte.Trigger(
    "batch_job",
    flyte.FixedRate(
        interval_minutes=360,  # 6 hours
        start_time=datetime(2025, 12, 1, 9, 0, 0)  # Start Dec 1st at 9 AM
    )
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

## The `automation` parameter with `flyte.Cron`

You can define a Cron-based schedule trigger by setting the `automation` parameter to an instance of `flyte.Cron`.

The `flyte.Cron` has the following signature:

```
flyte.Cron(
    cron_expression,
    timezone=None
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Parameters

**`cron_expression: str`** (required)
The cron expression defining when the trigger should fire. Uses standard Unix cron format with five fields: minute, hour, day of month, month, and day of week.

**`timezone: str | None`**
The timezone for the cron expression. If not specified, it defaults to UTC. Uses standard timezone names like "America/New_York" or "Europe/London".

### Examples

```
# Every day at 6 AM UTC
daily_trigger = flyte.Trigger(
    "daily_report",
    flyte.Cron("0 6 * * *")
)

# Every weekday at 9:30 AM Eastern Time
weekday_trigger = flyte.Trigger(
    "business_hours_task",
    flyte.Cron("30 9 * * 1-5", timezone="America/New_York")
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

#### Cron Expressions

Here are some common cron expressions you can use:

| Expression     | Description                          |
|----------------|--------------------------------------|
| `0 0 * * *`    | Every day at midnight                |
| `0 9 * * 1-5`  | Every weekday at 9 AM                |
| `30 14 * * 6`  | Every Saturday at 2:30 PM            |
| `0 0 1 * *`    | First day of every month at midnight |
| `0 0 25 * *`   | 25th day of every month at midnight  |
| `0 0 * * 0`    | Every Sunday at midnight             |
| `*/10 * * * *` | Every 10 minutes                     |
| `0 */2 * * *`  | Every 2 hours                        |

For a full guide on Cron syntax, refer to [Crontab Guru](https://crontab.guru/).

## The `inputs` parameter

The `inputs` parameter allows you to provide default values for your task's parameters when the trigger fires.
This is essential for parameterizing your automated executions and passing trigger-specific data to your tasks.

### Basic Usage

```
trigger_with_inputs = flyte.Trigger(
    "data_processing",
    flyte.Cron("0 6 * * *"),  # Daily at 6 AM
    inputs={
        "batch_size": 1000,
        "environment": "production",
        "debug_mode": False
    }
)

@env.task(triggers=trigger_with_inputs)
def process_data(batch_size: int, environment: str, debug_mode: bool = True) -> str:
    return f"Processing {batch_size} items in {environment} mode"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Using `flyte.TriggerTime`

The special `flyte.TriggerTime` value is used in the `inputs` to indicate the task parameter into which Flyte will inject the trigger execution timestamp:

```
timestamp_trigger = flyte.Trigger(
    "daily_report",
    flyte.Cron("0 0 * * *"),  # Daily at midnight
    inputs={
        "report_date": flyte.TriggerTime,  # Receives trigger execution time
        "report_type": "daily_summary"
    }
)

@env.task(triggers=timestamp_trigger)
def generate_report(report_date: datetime, report_type: str) -> str:
    return f"Generated {report_type} for {report_date.strftime('%Y-%m-%d')}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Required vs optional parameters

> [!IMPORTANT]
> If your task has parameters without default values, you **must** provide values for them in the trigger inputs, otherwise the trigger will fail to execute.

```python
# ❌ This will fail - missing required parameter 'data_source'
bad_trigger = flyte.Trigger(
    "bad_trigger",
    flyte.Cron("0 0 * * *")
    # Missing inputs for required parameter 'data_source'
)

@env.task(triggers=bad_trigger)
def bad_trigger_taska(data_source: str, batch_size: int = 100) -> str:
    return f"Processing from {data_source} with batch size {batch_size}"

# ✅ This works - all required parameters provided
good_trigger = flyte.Trigger(
    "good_trigger",
    flyte.Cron("0 0 * * *"),
    inputs={
        "data_source": "prod_database",  # Required parameter
        "batch_size": 500  # Override default
    }
)

@env.task(triggers=good_trigger)
def good_trigger_task(data_source: str, batch_size: int = 100) -> str:
    return f"Processing from {data_source} with batch size {batch_size}"
```

### Complex input types

You can pass various data types through trigger inputs:

```
complex_trigger = flyte.Trigger(
    "ml_training",
    flyte.Cron("0 2 * * 1"),  # Weekly on Monday at 2 AM
    inputs={
        "model_config": {
            "learning_rate": 0.01,
            "batch_size": 32,
            "epochs": 100
        },
        "feature_columns": ["age", "income", "location"],
        "validation_split": 0.2,
        "training_date": flyte.TriggerTime
    }
)

@env.task(triggers=complex_trigger)
def train_model(
    model_config: dict,
    feature_columns: list[str],
    validation_split: float,
    training_date: datetime
) -> str:
    return f"Training model with {len(feature_columns)} features on {training_date}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

## Predefined schedule triggers

For common scheduling needs, Flyte provides predefined trigger methods that create Cron-based schedules without requiring you to specify cron expressions manually.
These are convenient shortcuts for frequently used scheduling patterns.

### Available Predefined Triggers

```
minutely_trigger = flyte.Trigger.minutely()    # Every minute
hourly_trigger = flyte.Trigger.hourly()        # Every hour
daily_trigger = flyte.Trigger.daily()          # Every day at midnight
weekly_trigger = flyte.Trigger.weekly()        # Every week (Sundays at midnight)
monthly_trigger = flyte.Trigger.monthly()      # Every month (1st day at midnight)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

For reference, here's what each predefined trigger is equivalent to:

```python
# These are functionally identical:
flyte.Trigger.minutely() == flyte.Trigger("minutely", flyte.Cron("* * * * *"))
flyte.Trigger.hourly() == flyte.Trigger("hourly", flyte.Cron("0 * * * *"))
flyte.Trigger.daily() == flyte.Trigger("daily", flyte.Cron("0 0 * * *"))
flyte.Trigger.weekly() == flyte.Trigger("weekly", flyte.Cron("0 0 * * 0"))
flyte.Trigger.monthly() == flyte.Trigger("monthly", flyte.Cron("0 0 1 * *"))
```

### Predefined Trigger Parameters

All predefined trigger methods (`minutely()`, `hourly()`, `daily()`, `weekly()`, `monthly()`) accept the same set of parameters:

```
flyte.Trigger.daily(
    trigger_time_input_key="trigger_time",
    name="daily",
    description="A trigger that runs daily at midnight",
    auto_activate=True,
    inputs=None,
    env_vars=None,
    interruptible=None,
    overwrite_cache=False,
    queue=None,
    labels=None,
    annotations=None
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

#### Core Parameters

**`trigger_time_input_key: str = "trigger_time"`**
The name of the task parameter that will receive the execution timestamp.
If no `trigger_time_input_key` is provided, the default is `trigger_time`.
In this case, if the task does not have a parameter named `trigger_time`, the task will still be executed, but, obviously, the timestamp will not be passed.
However, if you do specify a `trigger_time_input_key`, but your task does not actually have the specified parameter, an error will be raised at trigger deployment time.

**`name: str`**
The unique identifier for the trigger. Defaults to the method name (`"daily"`, `"hourly"`, etc.).

**`description: str`**
Human-readable description of the trigger's purpose. Each method has a sensible default.

#### Configuration Parameters

**`auto_activate: bool = True`**
Whether the trigger should be automatically activated when deployed. Set to `False` to deploy inactive triggers that require manual activation.

**`inputs: Dict[str, Any] | None = None`**
Additional parameter values for your task when triggered. The `trigger_time_input_key` parameter is automatically included with `flyte.TriggerTime` as its value.

#### Runtime Override Parameters

**`env_vars: Dict[str, str] | None = None`**
Environment variables to set for triggered executions, overriding the task's default environment variables.

**`interruptible: bool | None = None`**
Whether triggered executions can be interrupted (useful for cost optimization with spot/preemptible instances). Overrides the task's interruptible setting.

**`overwrite_cache: bool = False`**
Whether to bypass/overwrite task cache for triggered executions, ensuring fresh computation.

**`queue: str | None = None`**
Specific execution queue for triggered runs, overriding the task's default queue.

#### Metadata Parameters

**`labels: Mapping[str, str] | None = None`**
Key-value labels for organizing and filtering triggers (e.g., team, component, priority).

**`annotations: Mapping[str, str] | None = None`**
Additional metadata, often used by infrastructure tools for compliance, monitoring, or cost tracking.

### Trigger time in predefined triggers

By default, predefined triggers will pass the execution time to the parameter `trigger_time` of type `datetime`,if that parameter exists on the task.
If no such parameter exists, the task will still be executed without error.

Optionally, you can customize the parameter name that receives the trigger execution timestamp by setting the `trigger_time_input_key` parameter (in this case the absence of this custom parameter on the task will raise an error at trigger deployment time):

```
@env.task(triggers=flyte.Trigger.daily(trigger_time_input_key="scheduled_at"))
def task_with_custom_trigger_time_input(scheduled_at: datetime) -> str:
    return f"Executed at {scheduled_at}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

## Multiple triggers per task

You can attach multiple triggers to a single task by providing a list of triggers. This allows you to run the same task on different schedules or with different configurations:

```
@env.task(triggers=[
    flyte.Trigger.hourly(),  # Predefined trigger
    flyte.Trigger.daily(),   # Another predefined trigger
    flyte.Trigger("custom", flyte.Cron("0 */6 * * *"))  # Custom trigger every 6 hours
])
def multi_trigger_task(trigger_time: datetime = flyte.TriggerTime) -> str:
    # Different logic based on execution timing
    if trigger_time.hour == 0:  # Daily run at midnight
        return f"Daily comprehensive processing at {trigger_time}"
    else:  # Hourly or custom runs
        return f"Regular processing at {trigger_time.strftime('%H:%M')}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

You can mix and match trigger types, combining predefined triggers with those that use `flyte.Cron`, and `flyte.FixedRate` automations (see below for explanations of these concepts).

## Deploying a task with triggers

We recommend that you define your triggers in code together with your tasks and deploy them together.

The Union UI displays:

* `Owner` - who last deployed the trigger.

* `Last updated` - who last activated or deactivated the trigger and when. Note: If you deploy a trigger with `auto_activate=True`(default), this will match the `Owner`.

* `Last Run` - when was the last run created by this trigger.

For development and debugging purposes, you can adjust and deploy individual triggers from the UI.

To deploy a task with its triggers, you can either use Flyte CLI:

```bash
flyte deploy -p <project> -d <domain> <file_with_tasks_and_triggers.py> env
```

Or in Python:

```python
flyte.deploy(env)
```

Upon deploy, all triggers that are associated with a given task `T` will be automatically switched to apply to the latest version of that task. Triggers on task `T` which are defined elsewhere (i.e. in the UI) will be deleted unless they have been referenced in the task definition of `T`

<!-- TODO
Add link to workflow deployment docs when available.
-->

## Activating and deactivating triggers

By default, triggers are automatically activated upon deployment (`auto_activate=True`).
Alternatively, you can set `auto_activate=False` to deploy inactive triggers.
An inactive trigger will not create runs until activated.

```
env = flyte.TaskEnvironment(name="my_task_env")

custom_cron_trigger = flyte.Trigger(
    "custom_cron",
    flyte.Cron("0 0 * * *"),
    auto_activate=False # Dont create runs yet
)

@env.task(triggers=custom_cron_trigger)
def custom_task() -> str:
    return "Hello, world!"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

This trigger won't create runs until it is explicitly activated.
You can activate a trigger via the Flyte CLI:

```bash
flyte update trigger custom_cron my_task_env.custom_task --activate --project <project> --domain <domain>
```

If you want to stop your trigger from creating new runs, you can deactivate it:

```bash
flyte update trigger custom_cron my_task_env.custom_task --deactivate --project <project> --domain <domain>
```

You can also view and manage your deployed triggers in the Union UI.

## Trigger run timing

The timing of the first run created by a trigger depends on the type of trigger used (Cron-based or Fixed-rate) and whether the trigger is active upon deployment.

### Cron-based triggers

For Cron-based triggers, the first run will be created at the next scheduled time according to the cron expression after trigger activation and similarly thereafter.

* `0 0 * * *` If deployed at 17:00 today, the trigger will first fire 7 hours later (0:00 of the following day) and then every day at 0:00 thereafter.

* `*/15 14 * * 1-5` if today is Tuesday at 17:00, the trigger will fire the next day (Wednesday) at 14:00, 14:15, 14:30, and 14:45 and then the same for every subsequent weekday thereafter.

### Fixed-rate triggers without `start_time`

If no `start_time` is specified, then the first run will be created after the specified interval from the time of activation. No run will be created immediately upon activation, but the activation time will be used as the reference point for future runs.

#### No `start_time`, auto_activate: True

Let's say you define a fixed rate trigger with automatic activation like this:

```
my_trigger = flyte.Trigger("my_trigger", flyte.FixedRate(60))
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

In this case, the first run will occur 60 minutes after the successful deployment of the trigger.
So, if you deployed this trigger at 13:15, the first run will occur at 14:15 and so on thereafter.

#### No `start_time`, auto_activate: False

On the other hand, let's say you define a fixed rate trigger without automatic activation like this:

```
my_trigger = flyte.Trigger("my_trigger", flyte.FixedRate(60), auto_activate=False)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

Then you activate it after about 3 hours. In this case the first run will kick off 60 minutes after trigger activation.
If you deployed the trigger at 13:15 and activated it at 16:07, the first run will occur at 17:07.

### Fixed-rate triggers with `start_time`

If a `start_time` is specified, the timing of the first run depends on whether the trigger is active at `start_time` or not.

#### Fixed-rate with `start_time` while active

If a `start_time` is specified, and the trigger is active at `start_time` then the first run will occur at `start_time` and then at the specified interval thereafter.
For example:

```
my_trigger = flyte.Trigger(
    "my_trigger",
    # Runs every 60 minutes starting from October 26th, 2025, 10:00am
    flyte.FixedRate(60, start_time=datetime(2025, 10, 26, 10, 0, 0)),
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

If you deploy this trigger on October 24th, 2025, the trigger will wait until October 26th 10:00am and will create the first run at exactly 10:00am.

#### Fixed-rate with `start_time` while inactive

If a start time is specified, but the trigger is activated after `start_time`, then the first run will be created when the next time point occurs that aligns with the recurring trigger interval using `start_time` as the initial reference point.
For example:

```
custom_rate_trigger = flyte.Trigger(
    "custom_rate",
    # Runs every 60 minutes starting from October 26th, 2025, 10:00am
    flyte.FixedRate(60, start_time=datetime(2025, 10, 26, 10, 0, 0)),
    auto_activate=False
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

If activated later than the `start_time`, say on October 28th 12:35pm for example, the first run will be created at October 28th at 1:00pm.

## Deleting triggers

If you decide that you don't need a trigger anymore, you can remove the trigger from the task definition and deploy the task again.

Alternatively, you can use Flyte CLI:

```bash
flyte delete trigger custom_cron my_task_env.custom_task --project <project> --domain <domain>
```

## Schedule time zones

### Setting time zone for a Cron schedule

Cron expressions are by default in UTC, but it's possible to specify custom time zones like so:

```
sf_trigger = flyte.Trigger(
    "sf_tz",
    flyte.Cron(
        "0 9 * * *", timezone="America/Los_Angeles"
    ), # Every day at 9 AM PT
    inputs={"start_time": flyte.TriggerTime, "x": 1},
)

nyc_trigger = flyte.Trigger(
    "nyc_tz",
    flyte.Cron(
        "1 12 * * *", timezone="America/New_York"
    ), # Every day at 12:01 PM ET
    inputs={"start_time": flyte.TriggerTime, "x": 1},
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

The above two schedules will fire 1 minute apart, at 9 AM PT and 12:01 PM ET respectively.

### `flyte.TriggerTime` is always in UTC

The `flyte.TriggerTime` value is always in UTC. For timezone-aware logic, convert as needed:

```
@env.task(triggers=flyte.Trigger.minutely(trigger_time_input_key="utc_trigger_time", name="timezone_trigger"))
def timezone_task(utc_trigger_time: datetime) -> str:
    local_time = utc_trigger_time.replace(tzinfo=timezone.utc).astimezone()
    return f"Task fired at {utc_trigger_time} UTC ({local_time} local)"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-configuration/triggers/triggers.py*

### Daylight Savings Time behavior

When Daylight Savings Time (DST) begins and ends, it can impact when the scheduled execution begins.

On the day DST begins, time jumps from 2:00AM to 3:00AM, which means the time of 2:30AM won't exist. In this case, the trigger will not fire until the next 2:30AM, which is the next day.

On the day DST ends, the hour from 1:00AM to 2:00AM repeats, which means the time of 1:30AM will exist twice. If the schedule above was instead set for 1:30AM, it would only run once, on the first occurrence of 1:30AM.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/interruptible-tasks-and-queues ===

# Interruptible tasks and queues

## Interruptible tasks

Cloud providers offer discounted compute instances (AWS Spot Instances, GCP Preemptible VMs)
that can be reclaimed at any time. These instances are significantly cheaper than on-demand
instances but come with the risk of preemption.

Setting `interruptible=True` allows Flyte to schedule the task on these spot/preemptible instances
for cost savings:

```python
import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    interruptible=True,
)

@env.task
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}
```

### Setting at different levels

`interruptible` can be set at the `TaskEnvironment` level, the `@env.task` decorator level,
and at the `task.override()` invocation level. The more specific level always takes precedence.

This lets you set a default at the environment level and override per-task:

```python
import flyte

# All tasks in this environment are interruptible by default
env = flyte.TaskEnvironment(
    name="my_env",
    interruptible=True,
)

# This task uses the environment default (interruptible)
@env.task
def preprocess(data: list) -> list:
    return [x * 2 for x in data]

# This task overrides to non-interruptible (critical, should not be preempted)
@env.task(interruptible=False)
def save_results(results: dict) -> str:
    return "saved"
```

You can also override at invocation time:

```python
@env.task
async def main(data: list) -> str:
    processed = preprocess(data=data)
    # Run this specific invocation as non-interruptible
    return save_results.override(interruptible=False)(results={"data": processed})
```

### Behavior on preemption

When a spot instance is reclaimed, the task is terminated and rescheduled.
Combine `interruptible=True` with [retries](./retries-and-timeouts) to handle preemptions gracefully:

```python
@env.task(interruptible=True, retries=3)
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}
```

> [!NOTE]
> Retries due to spot preemption do not count against the user-configured retry budget.
> System retries (for preemptions and other system-level failures) are tracked separately.

## Queues

Queues are named routing labels that map tasks to specific resource pools or execution clusters
in your infrastructure.

Setting a queue directs the task to the corresponding compute partition:

```python
import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    queue="gpu-pool",
)

@env.task
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}
```

### Setting at different levels

`queue` can be set at the `TaskEnvironment` level, the `@env.task` decorator level,
and at the `task.override()` invocation level. The more specific level takes precedence.

```python
import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    queue="default-pool",
)

# Uses environment-level queue ("default-pool")
@env.task
def preprocess(data: list) -> list:
    return [x * 2 for x in data]

# Overrides to a different queue
@env.task(queue="gpu-pool")
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}
```

If no queue is specified at any level, the default queue is used.

> [!NOTE]
> Queues are configured as part of your Union.ai deployment by your platform administrator.
> The available queue names depend on your infrastructure setup.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/task-plugins ===

# Task plugins

Flyte tasks are pluggable by design, allowing you to extend task execution beyond simple containers to support specialized compute frameworks and integrations.

## Default Execution: Containers

By default, Flyte tasks execute as single containers in Kubernetes. When you decorate a function with `@env.task`, Flyte packages your code into a container and runs it on the cluster. For more advanced scenarios requiring multiple containers in a single pod (such as sidecars for logging or data mounting), you can use [pod templates](./pod-templates), which allow you to customize the entire Kubernetes pod specification.

## Compute Plugins

Beyond native container execution, Flyte provides **compute plugins** that enable you to run distributed computing frameworks directly on Kubernetes. These plugins create ephemeral clusters specifically for your task execution, spinning them up on-demand and tearing them down when complete.

### Available Compute Plugins

Flyte supports several popular distributed computing frameworks through compute plugins:

- **Spark**: Run Apache Spark jobs using the Spark operator
- **Ray**: Execute Ray workloads for distributed Python applications and ML training
- **Dask**: Scale Python workflows with Dask distributed
- **PyTorch**: Run distributed training jobs using PyTorch and Kubeflow's training operator

### How Compute Plugins Work

Compute plugins create temporary, isolated clusters within the same Kubernetes environment as Flyte:

1. **Ephemeral clusters**: Each task execution gets its own cluster, spun up on-demand
2. **Kubernetes operators**: Flyte leverages specialized Kubernetes operators (Spark operator, Ray operator, etc.) to manage cluster lifecycle
3. **Native containerization**: The same container image system used for regular tasks works seamlessly with compute plugins
4. **Per-environment configuration**: You can define the cluster shape (number of workers, resources, etc.) using `plugin_config` in your `TaskEnvironment`

### Using Compute Plugins

To use a compute plugin, you need to:

1. **Install the plugin package**: Each plugin has a corresponding Python package (e.g., `flyteplugins-ray` for Ray)
2. **Configure the TaskEnvironment**: Set the `plugin_config` parameter with the plugin-specific configuration
3. **Write your task**: Use the framework's native APIs within your task function

#### Example: Ray Plugin

Here's how to run a distributed Ray task:

```python
import ray
from flyteplugins.ray.task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
import flyte

# Define your Ray computation
@ray.remote
def compute_square(x):
    return x * x

# Configure the Ray cluster
ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-workers", replicas=2)],
    runtime_env={"pip": ["numpy", "pandas"]},
    enable_autoscaling=False,
    shutdown_after_job_finishes=True,
    ttl_seconds_after_finished=300,
)

# Create a task environment with Ray plugin configuration
image = (
    flyte.Image.from_debian_base(name="ray")
    .with_pip_packages("ray[default]==2.46.0", "flyteplugins-ray")
)

ray_env = flyte.TaskEnvironment(
    name="ray_env",
    plugin_config=ray_config,
    image=image,
    resources=flyte.Resources(cpu=(3, 4), memory=("3000Mi", "5000Mi")),
)

# Use the Ray cluster in your task
@ray_env.task
async def distributed_compute(n: int = 10) -> list[int]:
    futures = [compute_square.remote(i) for i in range(n)]
    return ray.get(futures)
```

When this task runs, Flyte will:
1. Spin up a Ray cluster with 1 head node and 2 worker nodes
2. Execute your task code in the Ray cluster
3. Tear down the cluster after completion

### Using Plugins on Union

Most compute plugins are enabled by default on Union or can be enabled upon request. Contact your Account Manager to confirm plugin availability or request specific plugins for your deployment.

## Backend Integrations

Beyond compute plugins, Flyte also supports **integrations** with external SaaS services and internal systems through **connectors**. These allow you to seamlessly interact with:

- **Data warehouses**: Snowflake, BigQuery, Redshift
- **Data platforms**: Databricks
- **Custom services**: Your internal APIs and services

Connectors enable Flyte to delegate task execution to these external systems while maintaining Flyte's orchestration, observability, and data lineage capabilities. See the [connectors documentation](#) for more details on available integrations.

## Next Steps

For detailed guides on each compute plugin, including configuration options, best practices, and advanced examples, see the [Plugins section](#) of the documentation. Each plugin guide covers:

- Installation and setup
- Configuration options
- Resource management
- Advanced use cases
- Troubleshooting tips

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-configuration/additional-task-settings ===

# Additional task settings

This page covers task configuration parameters that do not have their own dedicated page:
naming and metadata, default inputs, environment variables, and inline I/O thresholds.

For the full list of all task configuration parameters, see [Configure tasks](./_index).

## Naming and metadata

### `name`

The `name` parameter on `TaskEnvironment` is required.
It is combined with each task function name to form the fully-qualified task name.
For example, if you define a `TaskEnvironment` with `name="my_env"` and a task function `my_task`,
the fully-qualified task name is `my_env.my_task`.

The `name` must use `snake_case` or `kebab-case` and is immutable once set.

### `short_name`

The `short_name` parameter on `@env.task` (and `override()`) overrides the display name of a task in the UI graph view.
By default, the display name is the Python function name.
Overriding `short_name` does not change the fully-qualified task name.

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

@env.task(short_name="Train Model")
def train(data: list) -> dict:
    return {"accuracy": 0.95}
```

### `description`

The `description` parameter on `TaskEnvironment` provides a description of the task environment (max 255 characters).
It is used for organizational purposes and can be viewed in the UI.

### `docs`

The `docs` parameter on `@env.task` accepts a `Documentation` object.
If not set explicitly, the documentation is auto-extracted from the task function's docstring.

```python
import flyte
from flyte import Documentation

env = flyte.TaskEnvironment(name="my_env")

@env.task(docs=Documentation(description="Trains a model on the given dataset."))
def train(data: list) -> dict:
    """This docstring is used if docs is not set explicitly."""
    return {"accuracy": 0.95}
```

### `report`

The `report` parameter on `@env.task` controls whether an HTML report is generated for the task.
See [Reports](https://www.union.ai/docs/v2/union/user-guide/task-programming/reports) for details.

### `links`

The `links` parameter on `@env.task` (and `override()`) attaches clickable URLs to tasks in the UI.
Use links to connect tasks to external tools like experiment trackers, monitoring dashboards, or logging systems.

Links are defined by implementing the [`Link`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/link) protocol.
See [Links](https://www.union.ai/docs/v2/union/user-guide/task-programming/links) for full details on creating and using links.

## Default inputs

Task functions support Python default parameter values. When a task parameter has a default, callers can omit it and the default is used.

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

@env.task
async def process(data: list, batch_size: int = 32, verbose: bool = False) -> dict:
    # batch_size defaults to 32, verbose defaults to False
    ...
```

When running via `flyte run`, parameters with defaults are optional:

```bash
# Uses defaults for batch_size and verbose
flyte run my_file.py process --data '[1, 2, 3]'

# Override a default
flyte run my_file.py process --data '[1, 2, 3]' --batch-size 64
```

When invoking programmatically, Python's normal default argument rules apply:

```python
result = flyte.run(process, data=[1, 2, 3])          # batch_size=32, verbose=False
result = flyte.run(process, data=[1, 2, 3], batch_size=64)  # override
```

Defaults are part of the task's input schema and are visible in the UI when viewing the task.

## Environment variables

The `env_vars` parameter on `TaskEnvironment` injects plain-text environment variables into the task container.
It accepts a `Dict[str, str]`.

```python
import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    env_vars={
        "LOG_LEVEL": "DEBUG",
        "API_ENDPOINT": "https://api.example.com",
    },
)

@env.task
def my_task() -> str:
    import os
    return os.environ["API_ENDPOINT"]
```

Environment variables can be overridden at the `task.override()` invocation level
(unless `reusable` is in effect).

Use `env_vars` for non-sensitive configuration values.
For sensitive values like API keys and credentials, use [`secrets`](./secrets) instead.

## Inline I/O threshold

The `max_inline_io_bytes` parameter on `@env.task` (and `override()`) controls the maximum
size for data passed directly in the task request and response
(e.g., primitives, strings, dictionaries).

Data exceeding this threshold raises an `InlineIOMaxBytesBreached` error.

The default value is 10 MiB (`10 * 1024 * 1024` bytes).

This setting does **not** affect [`flyte.io.File`, `flyte.io.Dir`](https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories),
or [`flyte.DataFrame`](https://www.union.ai/docs/v2/union/user-guide/task-programming/dataclasses-and-structures),
which are always offloaded to object storage regardless of size.

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

# Allow up to 50 MiB of inline data
@env.task(max_inline_io_bytes=50 * 1024 * 1024)
def process_large_dict(data: dict) -> dict:
    return {k: v * 2 for k, v in data.items()}
```

