# Build tasks
> This bundle contains all pages in the Build tasks section.
> Source: https://www.union.ai/docs/v2/union/user-guide/task-programming/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming ===

# Build tasks

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This section covers the essential programming patterns and techniques for developing robust Flyte workflows. Once you understand the basics of task configuration, these guides will help you build sophisticated, production-ready data pipelines and machine learning workflows.

## What you'll learn

The task programming section covers key patterns for building effective Flyte workflows:

**Data handling and types**
- **Build tasks > Files and directories**: Work with large datasets using Flyte's efficient file and directory types that automatically handle data upload, storage, and transfer between tasks.
- **Build tasks > DataFrames**: Pass DataFrames between tasks without downloading data into memory, with support for Pandas, Polars, PyArrow, Dask, and other DataFrame backends.
- **Build tasks > Data classes and structures**: Use Python data classes and Pydantic models as task inputs and outputs to create well-structured, type-safe workflows.
- **Build tasks > Custom context**: Use custom context to pass metadata through your task execution hierarchy without adding parameters to every task.

**Execution patterns**
- **Build tasks > Fanout**: Scale your workflows by running many tasks in parallel, perfect for processing large datasets or running hyperparameter sweeps.
- **Build tasks > Controlling parallel execution**: Limit concurrent task executions using semaphores or `flyte.map` concurrency for rate-limited APIs, GPU quotas, and resource-constrained workflows.
- **Build tasks > Human-in-the-loop**: Pause workflow execution at a checkpoint and wait for a human to provide input or approval before continuing.
- **Build tasks > Grouping actions**: Organize related task executions into logical groups for better visualization and management in the UI.
- **Build tasks > Run a bioinformatics tool**: Run arbitrary containers in any language without the Flyte SDK installed, using Flyte's copilot sidecar for seamless data flow.
- **Build tasks > Remote tasks**: Use previously deployed tasks without importing their code or dependencies, enabling team collaboration and task reuse.
- [**Pod templates**](https://www.union.ai/docs/v2/union/user-guide/task-configuration/pod-templates/page.md): Extend tasks with Kubernetes pod templates to add sidecars, volume mounts, and advanced Kubernetes configurations.
- **Build tasks > Abort and cancel actions**: Stop in-progress actions automatically, programmatically, or manually via the CLI and UI.
- **Build tasks > Regular async function (not a task)**: Advanced patterns like task forwarding and other specialized task execution techniques.

**Development and debugging**
- **Build tasks > Notebooks**: Write and iterate on workflows directly in Jupyter notebooks for interactive development and experimentation.
- **Build tasks > Test business logic directly**: Test your Flyte tasks using direct invocation for business logic or `flyte.run()` for Flyte-specific features.
- **Build tasks > Links**: Add clickable URLs to tasks in the Flyte UI, connecting them to external tools like experiment trackers and monitoring dashboards.
- **Build tasks > Reports**: Generate custom HTML reports during task execution to display progress, results, and visualizations in the UI.
- **Build tasks > Traces**: Add fine-grained observability to helper functions within your tasks for better debugging and resumption capabilities.
- **Build tasks > Error handling**: Implement robust error recovery strategies, including automatic resource scaling and graceful failure handling.

## When to use these patterns

These programming patterns become essential as your workflows grow in complexity:

- Use **fanout** when you need to process multiple items concurrently or run parameter sweeps. Use **controlling parallel execution** when you need to limit how many run at the same time.
- Implement **error handling** for production workflows that need to recover from infrastructure failures.
- Apply **grouping** to organize complex workflows with many task executions.
- Leverage **files and directories** when working with large datasets that don't fit in memory.
- Use **DataFrames** to efficiently pass tabular data between tasks across different processing engines.
- Choose **container tasks** when you need to run code in non-Python languages, use legacy containers, or execute AI-generated code in sandboxes.
- Use **remote tasks** to reuse tasks deployed by other teams without managing their dependencies.
- Apply **pod templates** when you need advanced Kubernetes features like sidecars or specialized storage configurations.
- Use **traces** to debug non-deterministic operations like API calls or ML inference.
- Use **links** to connect tasks to external tools like Weights & Biases, Grafana, or custom dashboards directly from the Flyte UI.
- Create **reports** to monitor long-running workflows and share results with stakeholders.
- Use **custom context** when you need lightweight, cross-cutting metadata to flow through your task hierarchy without becoming part of the task's logical inputs.
- Write **unit tests** to validate your task logic and ensure type transformations work correctly before deployment.
- Use **abort and cancel** to stop unnecessary actions when conditions change, such as early convergence in HPO or manual intervention.
- Use **human-in-the-loop** to insert approval gates or data collection checkpoints into automated workflows.

Each guide includes practical examples and best practices to help you implement these patterns effectively in your own workflows.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories ===

# Files and directories

Flyte provides the [`flyte.io.File`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.io/file) and
[`flyte.io.Dir`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.io/dir) types to represent files and directories, respectively.
Together with [`flyte.io.DataFrame`](./dataframes) they constitute the *offloaded data types* - unlike [materialized types](./dataclasses-and-structures) like data classes, these pass references rather than full data content.

A variable of an offloaded type does not contain its actual data, but rather a reference to the data.
The actual data is stored in the internal blob store of your Union/Flyte instance.
When a variable of an offloaded type is first created, its data is uploaded to the blob store.
It can then be passed from task to task as a reference.
The actual data is only downloaded from the blob stored when the task needs to access it, for example, when the task calls `open()` on a `File` or `Dir` object.

This allows Flyte to efficiently handle large files and directories without needing to transfer the data unnecessarily.
Even very large data objects like video files and DNA datasets can be passed efficiently between tasks.

The `File` and `Dir` classes provide both `sync` and `async` methods to interact with the data.

## Example usage

The examples below show the basic use-cases of uploading files and directories created locally, and using them as inputs to a task.

```
import asyncio
import tempfile
from pathlib import Path

import flyte
from flyte.io import Dir, File

env = flyte.TaskEnvironment(name="files-and-folders")

@env.task
async def write_file(name: str) -> File:

    # Create a file and write some content to it
    with open("test.txt", "w") as f:
        f.write(f"hello world {name}")

    # Upload the file using flyte
    uploaded_file_obj = await File.from_local("test.txt")
    return uploaded_file_obj
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

The upload happens when the [`File.from_local`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.io/file) command is called.
Because the upload would otherwise block execution, `File.from_local` is implemented as an `async` function.
The Flyte SDK frequently uses this class constructor pattern, so you will see it with other types as well.

This is a slightly more complicated task that calls the task above to produce `File` objects.

These are assembled into a directory and the `Dir` object is returned, also via invoking `from_local`.

```
@env.task
async def write_and_check_files() -> Dir:
    coros = []
    for name in ["Alice", "Bob", "Eve"]:
        coros.append(write_file(name=name))

    vals = await asyncio.gather(*coros)
    temp_dir = tempfile.mkdtemp()
    for file in vals:
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"File {file.path} contents: {contents_str}")
            new_file = Path(temp_dir) / file.name
            with open(new_file, "w") as out:  # noqa: ASYNC230
                out.write(contents_str)
    print(f"Files written to {temp_dir}")

    # walk the directory and ls
    for path in Path(temp_dir).iterdir():
        print(f"File: {path.name}")

    my_dir = await Dir.from_local(temp_dir)
    return my_dir
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

Finally, these tasks show how to use an offloaded type as an input.
Helper functions like `walk` and `open` have been added to the objects
and do what you might expect.

```
@env.task
async def check_dir(my_dir: Dir):
    print(f"Dir {my_dir.path} contents:")
    async for file in my_dir.walk():
        print(f"File: {file.name}")
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"Contents: {contents_str}")

@env.task
async def create_and_check_dir():
    my_dir = await write_and_check_files()
    await check_dir(my_dir=my_dir)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(create_and_check_dir)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

## JSONL files

The `flyteplugins-jsonl` package extends `File` and `Dir` with JSONL-aware types: `JsonlFile` and `JsonlDir`. They add streaming record-level read and write on top of the standard file/directory capabilities, with optional [zstd](https://github.com/facebook/zstd) compression and automatic shard rotation for large datasets.

Records are serialized with [orjson](https://github.com/ijl/orjson) for high performance. Both types provide async and sync APIs where every read/write method has a `_sync` variant (e.g. `iter_records_sync()`, `writer_sync()`).

```bash
pip install flyteplugins-jsonl
```

### Setup

```
import flyte
from flyteplugins.jsonl import JsonlDir, JsonlFile

env = flyte.TaskEnvironment(
    name="jsonl-examples",
    image=flyte.Image.from_debian_base(name="jsonl").with_pip_packages(
        "flyteplugins-jsonl"
    ),
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### JsonlFile

`JsonlFile` is a `File` subclass for single JSONL files. Use its async context manager to write records incrementally without loading the entire dataset into memory:

```
@env.task
async def write_records() -> JsonlFile:
    """Write records to a single JSONL file."""
    out = JsonlFile.new_remote("results.jsonl")
    async with out.writer() as writer:
        for i in range(500_000):
            await writer.write({"id": i, "score": i * 0.1})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading is equally streaming:

```
@env.task
async def read_records(data: JsonlFile) -> int:
    """Read records from a JsonlFile and return the count."""
    count = 0
    async for record in data.iter_records():
        count += 1
    return count
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### Compression

Both `JsonlFile` and `JsonlDir` support zstd compression transparently based on the file extension. Use `.jsonl.zst` (or `.jsonl.zstd`) to enable compression:

```
@env.task
async def write_compressed() -> JsonlFile:
    """Write a zstd-compressed JSONL file.

    Compression is activated by using a .jsonl.zst extension.
    Both reading and writing handle compression transparently.
    """
    out = JsonlFile.new_remote("results.jsonl.zst")
    async with out.writer(compression_level=3) as writer:
        for i in range(100_000):
            await writer.write({"id": i, "compressed": True})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading compressed files requires no code changes; the compression format is detected automatically from the extension.

### JsonlDir

`JsonlDir` is a `Dir` subclass that shards records across multiple JSONL files (named `part-00000.jsonl`, `part-00001.jsonl`, etc.). When a shard reaches the record count or byte size threshold, a new shard is opened automatically. This keeps individual files at a manageable size even for very large datasets:

```
@env.task
async def write_large_dataset() -> JsonlDir:
    """Write a large dataset to a sharded JsonlDir.

    JsonlDir automatically rotates to a new shard file once the
    current shard reaches the record or byte limit. Shards are named
    part-00000.jsonl, part-00001.jsonl, etc.
    """
    out = JsonlDir.new_remote("dataset/")
    async with out.writer(
        max_records_per_shard=100_000,
        max_bytes_per_shard=256 * 1024 * 1024,  # 256 MB
    ) as writer:
        for i in range(500_000):
            await writer.write({"index": i, "value": i * i})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Compressed shards are also supported by specifying the `shard_extension`:

```
@env.task
async def write_compressed_dir() -> JsonlDir:
    """Write zstd-compressed shards by specifying the shard extension."""
    out = JsonlDir.new_remote("compressed_dataset/")
    async with out.writer(
        shard_extension=".jsonl.zst",
        max_records_per_shard=50_000,
    ) as writer:
        for i in range(200_000):
            await writer.write({"id": i, "data": f"payload-{i}"})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading iterates across all shards transparently. The next shard is prefetched in the background to overlap network I/O with processing:

```
@env.task
async def sum_values(dataset: JsonlDir) -> int:
    """Read all records across all shards and compute a sum.

    Iteration is transparent across shards and handles mixed
    compressed/uncompressed shards automatically. The next shard is
    prefetched in the background for higher throughput.
    """
    total = 0
    async for record in dataset.iter_records():
        total += record["value"]
    return total
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

If you open a writer on a directory that already contains shards, the writer detects existing shard indices and continues from the next one, making it safe to append data to an existing `JsonlDir`.

### Error handling

All read methods accept an `on_error` parameter to control how corrupt or malformed lines are handled:

- `"raise"` (default): propagate parse errors immediately
- `"skip"`: log a warning and skip corrupt lines
- A callable `(line_number, raw_line, exception) -> None` for custom handling

```
@env.task
async def read_with_error_handling(data: JsonlFile) -> int:
    """Read records, skipping any corrupt lines instead of raising."""
    count = 0
    async for record in data.iter_records(on_error="skip"):
        count += 1
    return count

@env.task
async def read_with_custom_handler(data: JsonlFile) -> int:
    """Read records with a custom error handler that collects errors."""
    errors: list[dict] = []

    def on_error(line_number: int, raw_line: bytes, exc: Exception) -> None:
        errors.append({"line": line_number, "error": str(exc)})

    count = 0
    async for record in data.iter_records(on_error=on_error):
        count += 1
    print(f"{count} valid records, {len(errors)} errors")
    return count
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### Batch iteration

For bulk processing, both `JsonlFile` and `JsonlDir` support batched iteration. `iter_batches()` yields lists of dicts:

```
@env.task
async def process_in_batches(dataset: JsonlDir) -> int:
    """Process records in batches of dicts for bulk operations."""
    total = 0
    async for batch in dataset.iter_batches(batch_size=1000):
        # Each batch is a list[dict]
        total += len(batch)
    return total
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

For analytics workloads, `iter_arrow_batches()` yields Arrow `RecordBatch` objects directly. This requires the optional `pyarrow` dependency:

```bash
pip install 'flyteplugins-jsonl[arrow]'
```

```
arrow_env = flyte.TaskEnvironment(
    name="jsonl-arrow",
    image=flyte.Image.from_debian_base(name="jsonl-arrow").with_pip_packages(
        "flyteplugins-jsonl[arrow]"
    ),
)

@arrow_env.task
async def analyze_with_arrow(dataset: JsonlDir) -> float:
    """Stream records as Arrow RecordBatches for analytics.

    Memory usage is bounded by batch_size — the full dataset is
    never loaded into memory at once.
    """
    import pyarrow as pa

    batches = []
    async for batch in dataset.iter_arrow_batches(batch_size=65_536):
        batches.append(batch)

    table = pa.Table.from_batches(batches)
    mean_value = table.column("value").to_pylist()
    return sum(mean_value) / len(mean_value)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/dataclasses-and-structures ===

# Data classes and structures

Dataclasses and Pydantic models are fully supported in Flyte as **materialized data types**:
Structured data where the full content is serialized and passed between tasks.
Use these as you would normally, passing them as inputs and outputs of tasks.

Unlike **offloaded types** like [`DataFrame`s](./dataframes), [`File`s and `Dir`s](./files-and-directories), data class and Pydantic model data is fully serialized, stored, and deserialized between tasks.
This makes them ideal for configuration objects, metadata, and smaller structured data where all fields should be serializable.

## Example: Combining Dataclasses and Pydantic Models

This example demonstrates how data classes and Pydantic models work together as materialized data types, showing nested structures and batch processing patterns:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "pydantic",
# ]
# main = "main"
# params = ""
# ///

import asyncio
from dataclasses import dataclass
from typing import List

from pydantic import BaseModel
import flyte

env = flyte.TaskEnvironment(name="ex-mixed-structures")

@dataclass
class InferenceRequest:
    feature_a: float
    feature_b: float

@dataclass
class BatchRequest:
    requests: List[InferenceRequest]
    batch_id: str = "default"

class PredictionSummary(BaseModel):
    predictions: List[float]
    average: float
    count: int
    batch_id: str

@env.task
async def predict_one(request: InferenceRequest) -> float:
    """
    A dummy linear model: prediction = 2 * feature_a + 3 * feature_b + bias(=1.0)
    """
    return 2.0 * request.feature_a + 3.0 * request.feature_b + 1.0

@env.task
async def process_batch(batch: BatchRequest) -> PredictionSummary:
    """
    Processes a batch of inference requests and returns summary statistics.
    """
    # Process all requests concurrently
    tasks = [predict_one(request=req) for req in batch.requests]
    predictions = await asyncio.gather(*tasks)

    # Calculate statistics
    average = sum(predictions) / len(predictions) if predictions else 0.0

    return PredictionSummary(
        predictions=predictions,
        average=average,
        count=len(predictions),
        batch_id=batch.batch_id
    )

@env.task
async def summarize_results(summary: PredictionSummary) -> str:
    """
    Creates a text summary from the prediction results.
    """
    return (
        f"Batch {summary.batch_id}: "
        f"Processed {summary.count} predictions, "
        f"average value: {summary.average:.2f}"
    )

@env.task
async def main() -> str:
    batch = BatchRequest(
        requests=[
            InferenceRequest(feature_a=1.0, feature_b=2.0),
            InferenceRequest(feature_a=3.0, feature_b=4.0),
            InferenceRequest(feature_a=5.0, feature_b=6.0),
        ],
        batch_id="demo_batch_001"
    )
    summary = await process_batch(batch)
    result = await summarize_results(summary)
    return result

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataclasses-and-structures/example.py*

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/dataframes ===

# DataFrames

By default, return values in Python are materialized - meaning the actual data is downloaded and loaded into memory. This applies to simple types like integers, as well as more complex types like DataFrames.

To avoid downloading large datasets into memory, Flyte V2 exposes [`flyte.io.dataframe`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.io/dataframe): a thin,  uniform wrapper type for DataFrame-style objects that allows you to pass a reference to the data, rather than the fully materialized contents.

The `flyte.io.DataFrame` type provides serialization support for common engines like `pandas`, `polars`, `pyarrow`, `dask`, etc.; enabling you to move data between different DataFrame backends.

## Setting up the environment and sample data

For our example we will start by setting up our task environment with the required dependencies and create some sample data.

```
from typing import Annotated

import numpy as np
import pandas as pd
import flyte
import flyte.io

env = flyte.TaskEnvironment(
    "dataframe_usage",
    image= flyte.Image.from_debian_base().with_pip_packages("pandas", "pyarrow", "numpy"),
    resources=flyte.Resources(cpu="1", memory="2Gi"),
)

BASIC_EMPLOYEE_DATA = {
    "employee_id": range(1001, 1009),
    "name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah"],
    "department": ["HR", "Engineering", "Engineering", "Marketing", "Finance", "Finance", "HR", "Engineering"],
    "hire_date": pd.to_datetime(
        ["2018-01-15", "2019-03-22", "2020-07-10", "2017-11-01", "2021-06-05", "2018-09-13", "2022-01-07", "2020-12-30"]
    ),
}

ADDL_EMPLOYEE_DATA = {
    "employee_id": range(1001, 1009),
    "salary": [55000, 75000, 72000, 50000, 68000, 70000, np.nan, 80000],
    "bonus_pct": [0.05, 0.10, 0.07, 0.04, np.nan, 0.08, 0.03, 0.09],
    "full_time": [True, True, True, False, True, True, False, True],
    "projects": [
        ["Recruiting", "Onboarding"],
        ["Platform", "API"],
        ["API", "Data Pipeline"],
        ["SEO", "Ads"],
        ["Budget", "Forecasting"],
        ["Auditing"],
        [],
        ["Platform", "Security", "Data Pipeline"],
    ],
}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

## Create a raw DataFrame

Now, let's create a task that returns a native Pandas DataFrame:

```
@env.task
async def create_raw_dataframe() -> pd.DataFrame:
    return pd.DataFrame(BASIC_EMPLOYEE_DATA)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

This is the most basic use-case of how to pass DataFrames (of all kinds, not just Pandas).
We simply create the DataFrame as normal, and return it.

Because the task has been declared to return a supported native DataFrame type (in this case `pandas.DataFrame` Flyte will automatically detect it, serialize it correctly and upload it at task completion enabling it to be passed transparently to the next task.

Flyte supports auto-serialization for the following DataFrame types:
* `pandas.DataFrame`
* `pyarrow.Table`
* `dask.dataframe.DataFrame`
* `polars.DataFrame`
* `flyte.io.DataFrame` (see below)

## Create a flyte.io.DataFrame

Alternatively you can also create a `flyte.io.DataFrame` object directly from a native object with the `from_df` method:

```
@env.task
async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "parquet"]:
    pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA)
    fdf = flyte.io.DataFrame.from_df(pd_df)
    return fdf
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

The `flyte.io.DataFrame` class creates a thin wrapper around objects of any standard DataFrame type. It serves as a generic "any DataFrame type" (a concept that Python itself does not currently offer).

As with native DataFrame types, Flyte will automatically serialize and upload the data at task completion.

The advantage of the unified `flyte.io.DataFrame` wrapper is that you can be explicit about the storage format that makes sense for your use case, by using an `Annotated` type where the second argument encodes format or other lightweight hints. For example, here we specify that the DataFrame should be stored as Parquet:

## Automatically convert between types

You can leverage Flyte to automatically download and convert the DataFrame between types when needed:

```
@env.task
async def join_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> flyte.io.DataFrame:
    joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner")
    return flyte.io.DataFrame.from_df(joined_df)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

This task takes two DataFrames as input. We'll pass one raw Pandas DataFrame, and one `flyte.io.DataFrame`.
Flyte automatically converts the `flyte.io.DataFrame` to a Pandas DataFrame (since we declared that as the input type) before passing it to the task.
The actual download and conversion happens only when we access the data, in this case, when we do the merge.

## Downloading DataFrames

When a task receives a `flyte.io.DataFrame`, you can request a concrete backend representation. For example, to download as a pandas DataFrame:

```
@env.task
async def download_data(joined_df: flyte.io.DataFrame):
    downloaded = await joined_df.open(pd.DataFrame).all()
    print("Downloaded Data:\n", downloaded)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

The `open()` call delegates to the DataFrame handler for the stored format and converts to the requested in-memory type.

## Run the example

Finally, we can define a `main` function to run the tasks defined above and a `__main__` block to execute the workflow:

```
@env.task
async def main():
    raw_df = await create_raw_dataframe ()
    flyte_df = await create_flyte_dataframe ()
    joined_df = await join_data (raw_df, flyte_df)
    await download_data (joined_df)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

## Polars DataFrames

The `flyteplugins-polars` package extends Flyte's DataFrame support to `polars.DataFrame` and `polars.LazyFrame`. Install it alongside the core SDK and it registers automatically — no additional configuration required.

```bash
pip install flyteplugins-polars
```

Both types are serialized as Parquet when passed between tasks, just like other DataFrame backends.

### Setup

```
import polars as pl

import flyte

env = flyte.TaskEnvironment(
    name="polars-dataframes",
    image=flyte.Image.from_debian_base(name="polars").with_pip_packages(
        "flyteplugins-polars>=2.0.0", "polars"
    ),
    resources=flyte.Resources(cpu="1", memory="2Gi"),
)

EMPLOYEE_DATA = {
    "employee_id": [1001, 1002, 1003, 1004, 1005, 1006],
    "name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona"],
    "department": ["Engineering", "Engineering", "Marketing", "Finance", "Finance", "Engineering"],
    "salary": [75000, 72000, 50000, 68000, 70000, 80000],
    "years_experience": [5, 4, 2, 6, 5, 7],
}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

### Eager DataFrames

Use `pl.DataFrame` when you want immediate evaluation. Flyte serializes it to Parquet on output and deserializes it on input:

```
@env.task
async def create_dataframe() -> pl.DataFrame:
    """Create a Polars DataFrame.

    Polars DataFrames are passed between tasks as serialized Parquet files
    stored in the Flyte blob store — no manual upload required.
    """
    return pl.DataFrame(EMPLOYEE_DATA)

@env.task
async def filter_high_earners(df: pl.DataFrame) -> pl.DataFrame:
    """Filter and enrich a Polars DataFrame."""
    return (
        df.filter(pl.col("salary") > 60000)
        .with_columns(
            (pl.col("salary") / pl.col("years_experience")).alias("salary_per_year")
        )
        .sort("salary", descending=True)
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

### Lazy DataFrames

Use `pl.LazyFrame` when you want to defer computation and let Polars optimize the full query plan before executing. Flyte handles serialization the same way as `pl.DataFrame`:

```
@env.task
async def create_lazyframe() -> pl.LazyFrame:
    """Create a Polars LazyFrame.

    LazyFrames defer computation until collected, allowing Polars to
    optimize the full query plan. They are serialized to Parquet just
    like DataFrames when passed between tasks.
    """
    return pl.LazyFrame(EMPLOYEE_DATA)

@env.task
async def aggregate_by_department(lf: pl.LazyFrame) -> pl.DataFrame:
    """Aggregate salary statistics by department using a LazyFrame.

    The query plan is built lazily and executed only when collect() is called.
    """
    return (
        lf.group_by("department")
        .agg(
            pl.col("salary").mean().alias("avg_salary"),
            pl.col("salary").max().alias("max_salary"),
            pl.len().alias("headcount"),
        )
        .sort("avg_salary", descending=True)
        .collect()
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

The `collect()` call in `aggregate_by_department` is what triggers execution of the lazy plan. The `LazyFrame` passed between tasks is serialized as Parquet at that point.

### Run the example

```
@env.task
async def main():
    df = await create_dataframe()
    filtered = await filter_high_earners(df=df)
    print("High earners:")
    print(filtered)

    lf = await create_lazyframe()
    summary = await aggregate_by_department(lf=lf)
    print("Department summary:")
    print(summary)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/handling-custom-types ===

# Custom types

Flyte has a rich type system that handles most Python types automatically. However, there are cases where you may want to pass custom types into a run or between actions. By default, if Flyte doesn't recognize a type, it uses Python pickle to serialize the data. While this works, pickle has several drawbacks:

- **Inefficiency**: Pickle can be very inefficient for certain data types
- **Language compatibility**: Pickle is Python-specific and doesn't work with other languages
- **Version fragility**: Pickled data can break between Python versions
- **Opacity**: Pickled data appears as bytes or file links in the UI, with no automatic form generation

Consider types like Polars DataFrames or PyTorch Tensors. Using pickle for these is extremely inefficient compared to native serialization formats like Parquet or tensor-specific formats.

Flyte SDK addresses this by allowing you to create and share type extensions.

## Types of extensions

Flyte supports two types of type extensions:

1. **Type transformers**: For scalar types (integers, strings, files, directories, custom objects)
2. **DataFrame extensions**: For tabular data types that benefit from DataFrame-specific handling

DataFrame types are special because they have associated metadata (columns, schemas), can be serialized to efficient formats like Parquet, support parallel uploads from engines like Spark, and can be partitioned.

## Creating a type transformer

Type transformers convert between Python types and Flyte's internal representation. Here's how to create one for a custom `PositiveInt` type.

### Step 1: Define your custom type

```python
# custom_type.py
class PositiveInt:
    """A wrapper type that only accepts positive integers."""

    def __init__(self, value: int):
        if not isinstance(value, int):
            raise TypeError(f"Expected int, got {type(value).__name__}")
        if value <= 0:
            raise ValueError(f"Expected positive integer, got {value}")
        self._value = value

    @property
    def value(self) -> int:
        return self._value

    def __repr__(self) -> str:
        return f"PositiveInt({self._value})"
```

### Step 2: Create the type transformer

```python
# transformer.py
from typing import Type

from flyteidl2.core import literals_pb2, types_pb2

from flyte import logger
from flyte.types import TypeEngine, TypeTransformer, TypeTransformerFailedError
from my_transformer.custom_type import PositiveInt

class PositiveIntTransformer(TypeTransformer[PositiveInt]):
    """
    Type transformer for PositiveInt that validates and transforms positive integers.
    """

    def __init__(self):
        super().__init__(name="PositiveInt", t=PositiveInt)

    def get_literal_type(self, t: Type[PositiveInt]) -> types_pb2.LiteralType:
        """Returns the Flyte literal type for PositiveInt."""
        return types_pb2.LiteralType(
            simple=types_pb2.SimpleType.INTEGER,
            structure=types_pb2.TypeStructure(tag="PositiveInt"),
        )

    async def to_literal(
        self,
        python_val: PositiveInt,
        python_type: Type[PositiveInt],
        expected: types_pb2.LiteralType,
    ) -> literals_pb2.Literal:
        """Converts a PositiveInt instance to a Flyte Literal."""
        if not isinstance(python_val, PositiveInt):
            raise TypeTransformerFailedError(
                f"Expected PositiveInt, got {type(python_val).__name__}"
            )

        return literals_pb2.Literal(
            scalar=literals_pb2.Scalar(
                primitive=literals_pb2.Primitive(integer=python_val.value)
            )
        )

    async def to_python_value(
        self,
        lv: literals_pb2.Literal,
        expected_python_type: Type[PositiveInt]
    ) -> PositiveInt:
        """Converts a Flyte Literal back to a PositiveInt instance."""
        if not lv.scalar or not lv.scalar.primitive:
            raise TypeTransformerFailedError(
                f"Cannot convert literal {lv} to PositiveInt: missing scalar primitive"
            )

        value = lv.scalar.primitive.integer
        try:
            return PositiveInt(value)
        except (TypeError, ValueError) as e:
            raise TypeTransformerFailedError(
                f"Cannot convert value {value} to PositiveInt: {e}"
            )

    def guess_python_type(
        self,
        literal_type: types_pb2.LiteralType
    ) -> Type[PositiveInt]:
        """Guesses the Python type from a Flyte literal type."""
        if (
            literal_type.simple == types_pb2.SimpleType.INTEGER
            and literal_type.structure
            and literal_type.structure.tag == "PositiveInt"
        ):
            return PositiveInt
        raise ValueError(f"Cannot guess PositiveInt from literal type {literal_type}")
```

### Step 3: Register the transformer

Create a registration function that can be called to register your transformer:

```python
def register_positive_int_transformer():
    """Register the PositiveIntTransformer in the TypeEngine."""
    TypeEngine.register(PositiveIntTransformer())
    logger.info("Registered PositiveIntTransformer in TypeEngine")
```

## Distributing type plugins

To share your type transformer as an installable package, configure it as a Flyte plugin using entry points.

### Configure pyproject.toml

Add the entry point to your `pyproject.toml`:

```toml
[project]
name = "my_transformer"
version = "0.1.0"
description = "Custom type transformer"
requires-python = ">=3.10"
dependencies = []

[project.entry-points."flyte.plugins.types"]
my_transformer = "my_transformer.transformer:register_positive_int_transformer"
```

The entry point group `flyte.plugins.types` tells Flyte to automatically load this transformer when the package is installed.

### Automatic loading

When your plugin package is installed, Flyte automatically loads the type transformer at runtime. This happens during `flyte.init()` or `flyte.init_from_config()`.

## Controlling plugin loading

Loading many type plugins can add overhead to initialization. You can disable automatic plugin loading:

```python
import flyte

# Disable automatic loading of type transformer plugins
flyte.init(load_plugin_type_transformers=False)
```

By default, `load_plugin_type_transformers` is `True`.

## Using custom types in tasks

Once registered, use your custom type like any built-in type:

```python
import flyte
from my_transformer.custom_type import PositiveInt

env = flyte.TaskEnvironment(name="custom_types")

@env.task
async def process_positive(value: PositiveInt) -> int:
    """Process a positive integer."""
    return value.value * 2

if __name__ == "__main__":
    flyte.init_from_config()

    # The custom type works seamlessly
    run = flyte.run(process_positive, value=PositiveInt(42))
    run.wait()
    print(run.outputs()[0])  # 84
```

## DataFrame extensions

For tabular data types, Flyte provides a specialized extension mechanism through `flyte.io.DataFrame`. DataFrame extensions support:

- Automatic conversion to/from Parquet format
- Column metadata and schema information
- Parallel uploads from distributed engines
- Partitioning support

DataFrame extensions use encoders and decoders from `flyte.io.extend`. Documentation for creating DataFrame extensions is coming soon.

## Best practices

1. **Use specific types over pickle**: Define type transformers for any custom types used frequently in your workflows
2. **Keep transformers lightweight**: Avoid expensive operations in `to_literal` and `to_python_value`
3. **Add validation**: Validate data in your transformer to catch errors early
4. **Use meaningful tags**: The `TypeStructure.tag` helps identify your type in the Flyte UI
5. **Be judicious with plugins**: Only install the plugins you need to minimize initialization overhead

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/custom-context ===

# Custom context

Custom context provides a mechanism for implicitly passing configuration and metadata through your entire task execution hierarchy without adding parameters to every task. It is ideal for cross-cutting concerns such as tracing, environment metadata, or experiment identifiers.

Think of custom context as **execution-scoped metadata** that automatically flows from parent to child tasks.

## Overview

Custom context is an implicit key–value configuration map that is automatically available to tasks during execution. It is stored in the blob store of your Union/Flyte instance together with the task’s inputs, making it available across tasks without needing to pass it explicitly.

You can access it in a Flyte task via:

```python
flyte.ctx().custom_context
```

Custom context is fundamentally different from standard task inputs. Task inputs are explicit, strongly typed parameters that you declare as part of a task’s signature. They directly influence the task’s computation and therefore participate in Flyte’s caching and reproducibility guarantees.

Custom context, on the other hand, is implicit metadata. It consists only of string key/value pairs, is not part of the task signature, and does not affect task caching. Because it is injected by the Flyte runtime rather than passed as a formal input, it should be used only for environmental or contextual information, not for data that changes the logical output of a task.

## When to use it and when not to

Custom context is perfect when you need metadata, not domain data, to flow through your tasks.

Good use cases:

- Tracing IDs, span IDs
- Experiment or run metadata
- Environment region, cluster ID
- Logging correlation keys
- Feature flags
- Session IDs for 3rd-party APIs (e.g., an LLM session)

Avoid using for:

- Business/domain data
- Inputs that change task outputs
- Anything affecting caching or reproducibility
- Large blobs of data (keep it small)

It is the cleanest mechanism when you need something available everywhere, but not logically an input to the computation.

## Setting custom context

There are two ways to set custom context for a Flyte run:

1. Set it once for the entire run when you launch (`with_runcontext`) — this establishes the base context for the execution
2. Set or override it inside task code using `flyte.custom_context(...)` context manager — this changes the active context for that task block and any nested tasks called from it

Both are legitimate and complementary. The important behavioral rules to understand are:

- `with_runcontext(...)` sets the run-level base. Values provided here are available everywhere unless overridden later. Use this for metadata that should apply to most or all tasks in the run (experiment name, top-level trace id, run id, etc.).
- `flyte.custom_context(...)` is used inside task code to set or override values for that scope. It does affect nested tasks invoked while that context is active. In practice this means you can override run-level entries, add new keys for downstream tasks, or both.
- Merging & precedence: contexts are merged; when the same key appears in multiple places the most recent/innermost value wins (i.e., values set by `flyte.custom_context(...)` override the run-level values from `with_runcontext(...)` for the duration of that block).

### Run-level context

Set base metadata once when starting the run:

```
import flyte

env = flyte.TaskEnvironment("custom-context-example")

@env.task
async def leaf_task() -> str:
    # Reads run-level context
    print("leaf sees:", flyte.ctx().custom_context)
    return flyte.ctx().custom_context.get("trace_id")

@env.task
async def root() -> str:
    return await leaf_task()

if __name__ == "__main__":
    flyte.init_from_config()
    # Base context for the entire run
    flyte.with_runcontext(custom_context={"trace_id": "root-abc", "experiment": "v1"}).run(root)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/custom-context/run_context.py*

Output (every task sees the base keys unless overridden):

```bash
leaf sees: {"trace_id": "root-abc", "experiment": "v1"}
```

### Overriding inside a task (local override that affects nested tasks)

Use `flyte.custom_context(...)` inside a task to override or add keys for downstream calls:

```
@env.task
async def downstream() -> str:
    print("downstream sees:", flyte.ctx().custom_context)
    return flyte.ctx().custom_context.get("trace_id")

@env.task
async def parent() -> str:
    print("parent initial:", flyte.ctx().custom_context)

    # Override the trace_id for the nested call(s)
    with flyte.custom_context(trace_id="child-override"):
        val = await downstream()     # downstream sees trace_id="child-override"

    # After the context block, run-level values are back
    print("parent after:", flyte.ctx().custom_context)
    return val
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/custom-context/override_context.py*

If the run was started with `{"trace_id": "root-abc"}`, this prints:

```bash
parent initial: {"trace_id": "root-abc"}
downstream sees: {"trace_id": "child-override"}
parent after: {"trace_id": "root-abc"}
```

Note that the override affected the nested downstream task because it was invoked while the `flyte.custom_context` block was active.

### Adding new keys for nested tasks

You can add keys (not just override):

```python
with flyte.custom_context(experiment="exp-blue", run_group="g-7"):
    await some_task()   # some_task sees both base keys + the new keys
```

## Accessing custom context

Always via the Flyte runtime:

```python
ctx = flyte.ctx().custom_context
value = ctx.get("key")
```

You can access the custom context using either `flyte.ctx().custom_context` or the shorthand `flyte.get_custom_context()`, which returns the same dictionary of key/value pairs.

Values are always strings, so parse as needed:

```python
timeout = int(ctx["timeout_seconds"])
```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/abort-tasks ===

# Abort and cancel actions

When running complex workflows, you may need to stop actions that are no longer needed.
This can happen when one branch of your workflow makes others redundant, when a task fails and its siblings should not continue, or when you need to manually intervene in a running workflow.

Flyte provides three mechanisms for stopping actions:

- **Automatic cleanup**: When a root action completes, all its in-progress descendant actions are automatically aborted.
- **Programmatic cancellation**: Cancel specific `asyncio` tasks from within your workflow code.
- **External abort**: Stop individual actions via the CLI, the UI, or the API.

For background on runs and actions, see [Runs and actions](https://www.union.ai/docs/v2/union/user-guide/core-concepts/runs-and-actions).

## Action lifetime

The lifetime of all actions in a [run](https://www.union.ai/docs/v2/union/user-guide/core-concepts/runs-and-actions) is tied to the lifetime of the root action (the first task that was invoked).
When the root action exits—whether it succeeds, fails, or returns early—all in-progress descendant actions are automatically aborted and no new actions can be enqueued.

This means you don't need to manually clean up child actions. Flyte handles it for you.

Consider this example where `main` exits after 10 seconds, but it has spawned a `sleep_for` action that is set to run for 30 seconds:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "seconds = 30"
# ///

import asyncio

import flyte

env = flyte.TaskEnvironment(name="action_lifetime")

@env.task
async def do_something():
    print("Doing something")
    await asyncio.sleep(5)
    print("Finished doing something")

@env.task
async def sleep_for(seconds: int):
    print(f"Sleeping for {seconds} seconds")
    try:
        await asyncio.sleep(seconds)
        await do_something()
    except asyncio.CancelledError:
        print("sleep_for was cancelled")
        return
    print(f"Finished sleeping for {seconds} seconds")

@env.task
async def main(seconds: int):
    print("Starting main")
    asyncio.create_task(sleep_for(seconds))
    await asyncio.sleep(10)
    print("Main finished")

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, seconds=30)
    print(run.url)
    run.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/action_lifetime.py*

When `main` returns after 10 seconds, the `sleep_for` action (which still has 20 seconds remaining) is automatically aborted.
The `sleep_for` task receives an `asyncio.CancelledError`, giving it a chance to handle the cancellation gracefully.

## Canceling actions programmatically

As a workflow author, you can cancel specific in-progress actions by canceling their corresponding `asyncio` tasks.
This is useful in scenarios like hyperparameter optimization (HPO), where one action converges to the desired result and the remaining actions can be stopped to save compute.

To cancel actions programmatically:

1. Launch actions using `asyncio.create_task()` and retain references to the returned task objects.
2. When the desired condition is met, call `.cancel()` on the tasks you want to stop.

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 30, f = 10.0"
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment("cancel")

@env.task
async def sleepers(f: float, n: int):
    await asyncio.sleep(f)

@env.task
async def failing_task(f: float):
    raise ValueError("I will fail!")

@env.task
async def main(n: int, f: float):
    sleeping_tasks = []
    for i in range(n):
        sleeping_tasks.append(asyncio.create_task(sleepers(f, i)))

    await asyncio.sleep(f)
    try:
        await failing_task(f)
        await asyncio.gather(*sleeping_tasks)
    except flyte.errors.RuntimeUserError as e:
        if e.code == "ValueError":
            print(f"Received ValueError, canceling {len(sleeping_tasks)} sleeping tasks")
            for t in sleeping_tasks:
                t.cancel()
        return

if __name__ == "__main__":
    flyte.init_from_config()
    print(flyte.run(main, 30, 10.0))
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/cancel_tasks.py*

In this code:

* The `main` task launches 30 `sleepers` actions in parallel using `asyncio.create_task()`.
* It then calls `failing_task`, which raises a `ValueError`.
* The error is caught as a `flyte.errors.RuntimeUserError` (since user-raised exceptions are wrapped by Flyte).
* On catching the error, `main` cancels all sleeping tasks by calling `.cancel()` on each one, freeing their compute resources.

This pattern lets you react to runtime conditions and stop unnecessary work. For more on handling errors within workflows, see [Error handling](./error-handling).

## External abort

Sometimes you need to stop an action manually, outside the workflow code itself. You can abort individual actions using the CLI, the UI, or the API.

When an action is externally aborted, the parent action that awaits it receives a [`flyte.errors.ActionAbortedError`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.errors/actionabortederror). You can catch this error to handle the abort gracefully.

### Aborting via the CLI

To abort a specific action:

```bash
flyte abort <run-name> <action-name>
```

Use `--project` and `--domain` to target a specific [project-domain pair](https://www.union.ai/docs/v2/union/user-guide/projects-and-domains).
For all available options, see the [CLI reference](https://www.union.ai/docs/v2/union/api-reference/flyte-cli).

### Handling external aborts

When using `asyncio.gather()` with `return_exceptions=True`, externally aborted actions return an `ActionAbortedError` instead of raising it. This lets you inspect results and handle aborts on a per-action basis:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n = 10, sleep_for = 30.0"
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment("external_abort")

@env.task
async def long_sleeper(sleep_for: float):
    await asyncio.sleep(sleep_for)

@env.task
async def main(n: int, sleep_for: float) -> str:
    coros = [long_sleeper(sleep_for) for _ in range(n)]
    results = await asyncio.gather(*coros, return_exceptions=True)
    for i, r in enumerate(results):
        if isinstance(r, flyte.errors.ActionAbortedError):
            print(f"Action [{i}] was externally aborted")
    return "Hello World!"

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, 10, 30.0)
    print(run.url)
    run.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/abort-tasks/external_abort.py*

In this code:

* The `main` task launches 10 `long_sleeper` actions in parallel.
* If any action is externally aborted (via the CLI, the UI, or the API) while running, `asyncio.gather` captures the `ActionAbortedError` as a result instead of propagating it.
* The `main` task iterates over the results and logs which actions were aborted.
* Because the abort is handled, `main` can continue executing and return its result normally.

Without `return_exceptions=True`, an external abort would raise `ActionAbortedError` directly, which you can handle with a standard `try...except` block.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/container-tasks ===

Container tasks are one of Flyte's superpowers. They allow you to execute tasks using any container image without requiring the Flyte SDK to be installed in that container. This means you can run code written in any language, execute shell scripts, or even use pre-built containers pulled directly from the internet while still maintaining Flyte's data orchestration capabilities.

## What are Container Tasks?

A container task is a special type of Flyte task that executes arbitrary container images. Unlike standard `@task` decorated functions that require the Flyte SDK, container tasks can run:

- Code written in any programming language (Rust, Go, Java, R, etc.)
- Legacy containers with unsupported Python versions
- Pre-built bioinformatics or scientific computing containers
- Shell scripts and command-line tools
- Dynamically generated code in sandboxed environments

## How Data Flows In and Out

The magic of container tasks lies in Flyte's **copilot sidecar system**. When you execute a container task, Flyte:

1. Launches your specified container alongside a copilot sidecar container
2. Uses shared Kubernetes pod volumes to pass data between containers
3. Reads inputs from `input_data_dir` and writes outputs to `output_data_dir`
4. Automatically handles serialization and deserialization of typed data

This means you can construct workflows where some tasks are container tasks while others are Python functions, and data will flow seamlessly between them.

## Basic Usage

Here's a simple example that runs a shell command in an Alpine container:

```python
import flyte
from flyte.extras import ContainerTask

greeting_task = ContainerTask(
    name="echo_and_return_greeting",
    image=flyte.Image.from_base("alpine:3.18"),
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs={"name": str},
    outputs={"greeting": str},
    command=[
        "/bin/sh",
        "-c",
        "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"
    ],
)
```

### Template Syntax for Inputs

Container tasks support template-style references to inputs using the syntax `{{.inputs.<input_name>}}`. This gets replaced with the actual input value at runtime:

```python
command=["/bin/sh", "-c", "echo 'Processing {{.inputs.user_id}}' > /var/outputs/result"]
```

### Using Container Tasks in Workflows

Container tasks integrate seamlessly with Python tasks:

```python
container_env = flyte.TaskEnvironment.from_task("container_env", greeting_task)
env = flyte.TaskEnvironment(name="hello_world", depends_on=[container_env])

@env.task
async def say_hello(name: str = "flyte") -> str:
    print("Hello container task")
    return await greeting_task(name=name)
```

## Advanced: Passing Files and Directories

Container tasks can accept `File` and `Dir` inputs. For these types, use path-based syntax (not template syntax) in your commands:

```python
from flyte.io import File
import pathlib

code_runner = ContainerTask(
    name="python_code_runner",
    image="ghcr.io/astral-sh/uv:debian-slim",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs={"script.py": File, "a": int, "b": int},
    outputs={"result": int},
    command=[
        "/bin/sh",
        "-c",
        "uv run /var/inputs/script.py {{.inputs.a}} {{.inputs.b}} > /var/outputs/result"
    ],
)

@env.task
async def execute_script() -> int:
    path = pathlib.Path(__file__).parent / "my_script.py"
    script_file = await File.from_local(path)
    return await code_runner(**{"script.py": script_file, "a": 10, "b": 20})
```

Note that when passing files, the input key can include the filename (e.g., `"script.py"`), and you reference it in the command as `/var/inputs/script.py`.

## Use Case: Agentic Sandbox Execution

Container tasks are perfect for running AI-generated code in isolated environments. You can generate a data analysis script dynamically and execute it safely:

```python
import flyte
from flyte.extras import ContainerTask
from flyte.io import File
import pathlib

env = flyte.TaskEnvironment(name="agentic_sandbox")

@env.task
async def run_generated_code(script_content: str, param_a: int, param_b: int) -> int:
    # Define a container task that runs arbitrary Python code
    sandbox = ContainerTask(
        name="code_sandbox",
        image="ghcr.io/astral-sh/uv:debian-slim",
        input_data_dir="/var/inputs",
        output_data_dir="/var/outputs",
        inputs={"script": File, "a": int, "b": int},
        outputs={"result": int},
        command=[
            "/bin/sh",
            "-c",
            "uv run --script /var/inputs/script {{.inputs.a}} {{.inputs.b}} > /var/outputs/result"
        ],
    )

    # Save the generated script to a temporary file
    temp_path = pathlib.Path("/tmp/generated_script.py")
    temp_path.write_text(script_content)

    # Execute it in the sandbox
    script_file = await File.from_local(temp_path)
    return await sandbox(script=script_file, a=param_a, b=param_b)
```

This pattern allows you to:
- Generate code using LLMs or other AI systems
- Execute it in a controlled, isolated environment
- Capture results and integrate them back into your workflow
- Maintain full observability and reproducibility

## Use Case: Legacy and Specialized Containers

Many scientific and bioinformatics tools are distributed as pre-built containers. Container tasks let you integrate them directly:

```python
# Run a bioinformatics tool
blast_task = ContainerTask(
    name="run_blast",
    image="ncbi/blast:latest",
    input_data_dir="/data",
    output_data_dir="/results",
    inputs={"query": File, "database": str},
    outputs={"alignments": File},
    command=[
        "blastn",
        "-query", "/data/query",
        "-db", "{{.inputs.database}}",
        "-out", "/results/alignments",
        "-outfmt", "6"
    ],
)

# Run legacy code with an old Python version
legacy_task = ContainerTask(
    name="legacy_python",
    image="python:2.7",  # Unsupported Python version
    input_data_dir="/app/inputs",
    output_data_dir="/app/outputs",
    inputs={"data_file": File},
    outputs={"processed": File},
    command=[
        "python",
        "/legacy_app/process.py",
        "/app/inputs/data_file",
        "/app/outputs/processed"
    ],
)
```

## Use Case: Multi-Language Workflows

Build workflows that span multiple languages:

```python
# Rust task for high-performance computation
rust_task = ContainerTask(
    name="rust_compute",
    image="rust:1.75",
    inputs={"n": int},
    outputs={"result": int},
    input_data_dir="/inputs",
    output_data_dir="/outputs",
    command=["./compute_binary", "{{.inputs.n}}"],
)

# Python task for orchestration
@env.task
async def multi_lang_workflow(iterations: int) -> dict:
    # Call Rust task for heavy computation
    computed = await rust_task(n=iterations)

    # Process results in Python
    processed = await python_analysis_task(computed)

    return {"rust_result": computed, "analysis": processed}
```

## Configuration Options

### ContainerTask Parameters

- **name**: Unique identifier for the task
- **image**: Container image to use (string or `Image` object)
- **command**: Command to execute in the container (list of strings)
- **inputs**: Dictionary mapping input names to types
- **outputs**: Dictionary mapping output names to types
- **input_data_dir**: Directory where Flyte writes input data (default: `/var/inputs`)
- **output_data_dir**: Directory where Flyte reads output data (default: `/var/outputs`)
- **arguments**: Additional command arguments (list of strings)
- **metadata_format**: Format for metadata serialization (`"JSON"`, `"YAML"`, or `"PROTO"`)
- **local_logs**: Whether to print container logs during local execution (default: `True`)

### Supported Input/Output Types

Container tasks support all standard Flyte types:

- Primitives: `str`, `int`, `float`, `bool`
- Temporal: `datetime.datetime`, `datetime.timedelta`
- File system: `File`, `Dir`
- Complex types: dataclasses, Pydantic models (serialized as JSON/YAML/PROTO)

## Best Practices

1. **Use specific image tags**: Prefer `alpine:3.18` over `alpine:latest` for reproducibility
2. **Keep containers focused**: Each container task should do one thing well
3. **Handle errors gracefully**: Ensure your container commands exit with appropriate status codes
4. **Test locally first**: Container tasks can run locally with Docker, making debugging easier
5. **Consider image size**: Smaller images lead to faster task startup times
6. **Document input/output contracts**: Clearly specify what data flows in and out

## Local Execution

Container tasks require Docker to be installed and running on your local machine. When you run them locally, Flyte will:

1. Pull the specified image (if not already available)
2. Mount local directories for inputs and outputs
3. Stream container logs to your console
4. Extract outputs after container completion

This makes it easy to develop and test container tasks before deploying to a remote cluster.

## When to Use Container Tasks

Choose container tasks when you need to:

- Run code in languages other than Python
- Execute pre-built tools or legacy applications
- Isolate potentially unsafe code (AI-generated scripts)
- Use specific runtime environments or dependencies
- Integrate external tools without Python wrappers
- Execute shell scripts or command-line utilities

For Python code that can use the Flyte SDK, standard `@task` decorated functions are usually simpler and more efficient.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/links ===

# Links

Links let you add clickable URLs to tasks that appear in the Flyte UI. Use them to connect tasks to external tools like experiment trackers, monitoring dashboards or custom internal services.

![Links in the Flyte UI](https://raw.githubusercontent.com/unionai/unionai-docs-static/refs/heads/main/images/integrations/wandb/single_node_auto_flyte.png)

You can attach links to tasks in two ways:

- **Statically** in the task decorator with `links=`
- **Dynamically** at call time with `task.override(links=...)`

`Link` is a Python [Protocol](https://docs.python.org/3/library/typing.html#typing.Protocol) that you subclass to define how URLs are generated. The Weights & Biases plugin provides a [built-in link implementation](https://www.union.ai/docs/v2/union/api-reference/integrations/wandb/packages/flyteplugins.wandb/wandb) as an example.

## Creating a link

To create a link, subclass `Link` as a dataclass and implement the `get_link()` method. The method returns the URL string to display in the UI:

```python
from dataclasses import dataclass

import flyte
from flyte import Link

@dataclass
class GrafanaLink(Link):
    dashboard_url: str
    name: str = "Grafana"

    def get_link(
        self,
        run_name: str,
        project: str,
        domain: str,
        context: dict,
        parent_action_name: str,
        action_name: str,
        pod_name: str,
        **kwargs,
    ) -> str:
        return f"{self.dashboard_url}?var-pod={pod_name}"

env = flyte.TaskEnvironment(...)

@env.task(links=(GrafanaLink(dashboard_url="https://grafana.example.com/d/abc123"),))
def my_task() -> str:
    return "done"
```

The link appears as a clickable "Grafana" link in the Flyte UI for every execution of `my_task`.

## Using execution metadata

The `get_link()` method receives execution metadata that you can use to construct dynamic URLs. Here's an example modeled on the [built-in Wandb](https://www.union.ai/docs/v2/union/user-guide/integrations/wandb/_index) link that uses the `context` dict to resolve a run ID:

```python
from dataclasses import dataclass
from typing import Optional

from flyte import Link

@dataclass
class Wandb(Link):
    project: str
    entity: str
    id: Optional[str] = None
    name: str = "Weights & Biases"

    def get_link(
        self,
        run_name: str,
        project: str,
        domain: str,
        context: dict[str, str],
        parent_action_name: str,
        action_name: str,
        pod_name: str,
        **kwargs,
    ) -> str:
        run_id = self.id or context.get("wandb_id", run_name)
        return f"https://wandb.ai/{self.entity}/{self.project}/runs/{run_id}"
```

The `name` attribute controls the display label in the UI.

See the [`get_link()` API reference](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte/link) for more details. Note that `action_name` and `pod_name` are template variables (`{{.actionName}}` and `{{.podName}}`) that are populated by the backend at runtime.

## Dynamic links with override

Use `task.override(links=...)` to set links at runtime. This is useful when link parameters depend on runtime values like run IDs or configuration:

```python
import os

import flyte
from flyteplugins.wandb import Wandb

env = flyte.TaskEnvironment(...)

WANDB_PROJECT = "my-ml-project"
WANDB_ENTITY = "my-team"

@env.task
def train_model(config: dict) -> dict:
    # Training logic here
    return {"accuracy": 0.95}

@env.task
async def main(wandb_id: str) -> dict:
    result = train_model.override(
        links=(
            Wandb(
                project=WANDB_PROJECT,
                entity=WANDB_ENTITY,
                id=wandb_id,
            ),
        )
    )(config={"lr": 0.001})

    return result

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, wandb_id="my-run-id")
```

The `override` approach lets you attach links with values that are only known at runtime, such as dynamically generated run IDs.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/reports ===

# Reports

The reports feature allows you to display and update custom output in the UI during task execution.

First, you set the `report=True` flag in the task decorator. This enables the reporting feature for that task.
Within a task with reporting enabled, a [`flyte.report.Report`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.report/report) object is created automatically.

A `Report` object contains one or more tabs, each of which contains HTML.
You can write HTML to an existing tab and create new tabs to organize your content.
Initially, the `Report` object has one tab (the default tab) with no content.

To write content:

- [`flyte.report.log()`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.report/_index) appends HTML content directly to the default tab.
- [`flyte.report.replace()`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.report/_index) replaces the content of the default tab with new HTML.

To get or create a new tab:

- [`flyte.report.get_tab()`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.report/_index) allows you to specify a unique name for the tab, and it will return the existing tab if it already exists or create a new one if it doesn't.
  It returns a `flyte.report._report.Tab`

You can `log()` or `replace()` HTML on the `Tab` object just as you can directly on the `Report` object.

Finally, you send the report to the Flyte server and make it visible in the UI:

- [`flyte.report.flush()`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.report/_index) dispatches the report.
  **It is important to call this method to ensure that the data is sent**.

<!-- TODO:
Check (test) if implicit flush is performed at the end of the task execution.
-->

## A simple example

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = ""
# ///

import flyte
import flyte.report

env = flyte.TaskEnvironment(name="reports_example")

@env.task(report=True)
async def task1():
    await flyte.report.replace.aio("<p>The quick, brown fox jumps over a lazy dog.</p>")
    tab2 = flyte.report.get_tab("Tab 2")
    tab2.log("<p>The quick, brown dog jumps over a lazy fox.</p>")
    await flyte.report.flush.aio()

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(task1)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/simple.py*

Here we define a task `task1` that logs some HTML content to the default tab and creates a new tab named "Tab 2" where it logs additional HTML content.
The `flush` method is called to send the report to the backend.

## A more complex example

Here is another example.
We import the necessary modules, set up the task environment, define the main task with reporting enabled and define the data generation function:

```
import json
import random

import flyte
import flyte.report

env = flyte.TaskEnvironment(
    name="globe_visualization",
)

@env.task(report=True)
async def generate_globe_visualization():
    await flyte.report.replace.aio(get_html_content())
    await flyte.report.flush.aio()

def generate_globe_data():
    """Generate sample data points for the globe"""
    cities = [
        {"city": "New York", "country": "USA", "lat": 40.7128, "lng": -74.0060},
        {"city": "London", "country": "UK", "lat": 51.5074, "lng": -0.1278},
        {"city": "Tokyo", "country": "Japan", "lat": 35.6762, "lng": 139.6503},
        {"city": "Sydney", "country": "Australia", "lat": -33.8688, "lng": 151.2093},
        {"city": "Paris", "country": "France", "lat": 48.8566, "lng": 2.3522},
        {"city": "São Paulo", "country": "Brazil", "lat": -23.5505, "lng": -46.6333},
        {"city": "Mumbai", "country": "India", "lat": 19.0760, "lng": 72.8777},
        {"city": "Cairo", "country": "Egypt", "lat": 30.0444, "lng": 31.2357},
        {"city": "Moscow", "country": "Russia", "lat": 55.7558, "lng": 37.6176},
        {"city": "Beijing", "country": "China", "lat": 39.9042, "lng": 116.4074},
        {"city": "Lagos", "country": "Nigeria", "lat": 6.5244, "lng": 3.3792},
        {"city": "Mexico City", "country": "Mexico", "lat": 19.4326, "lng": -99.1332},
        {"city": "Bangkok", "country": "Thailand", "lat": 13.7563, "lng": 100.5018},
        {"city": "Istanbul", "country": "Turkey", "lat": 41.0082, "lng": 28.9784},
        {"city": "Buenos Aires", "country": "Argentina", "lat": -34.6118, "lng": -58.3960},
        {"city": "Cape Town", "country": "South Africa", "lat": -33.9249, "lng": 18.4241},
        {"city": "Dubai", "country": "UAE", "lat": 25.2048, "lng": 55.2708},
        {"city": "Singapore", "country": "Singapore", "lat": 1.3521, "lng": 103.8198},
        {"city": "Stockholm", "country": "Sweden", "lat": 59.3293, "lng": 18.0686},
        {"city": "Vancouver", "country": "Canada", "lat": 49.2827, "lng": -123.1207},
    ]

    categories = ["high", "medium", "low", "special"]

    data_points = []
    for city in cities:
        data_point = {**city, "value": random.randint(10, 100), "category": random.choice(categories)}
        data_points.append(data_point)

    return data_points
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/globe_visualization.py*

We then define the HTML content for the report:

```python
def get_html_content():
    data_points = generate_globe_data()

    html_content = f"""
    <!DOCTYPE html>
    <html lang="en">
    ...
    </html>
    return html_content
"""
```

(We exclude it here due to length. You can find it in the [source file](https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/globe_visualization.py)).

Finally, we run the workflow:

```
if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(generate_globe_visualization)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/globe_visualization.py*

When the workflow runs, the report will be visible in the UI:

![Globe visualization](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/globe_visualization.png)

## Streaming example

Above we demonstrated reports that are sent to the UI once, at the end of the task execution.
But, you can also stream updates to the report during task execution and see the display update in real-time.

You do this by calling `flyte.report.flush()` (or specifying `do_flush=True` in `flyte.report.log()`) periodically during the task execution, instead of just at the end of the task execution

> [!NOTE]
> In the above examples we explicitly call `flyte.report.flush()` to send the report to the UI.
> In fact, this is optional since flush will be called automatically at the end of the task execution.
> For streaming reports, on the other hand, calling `flush()` periodically (or specifying `do_flush=True`
> in `flyte.report.log()`) is necessary to display the updates.

First we import the necessary modules, and set up the task environment:

```
import asyncio
import json
import math
import random
import time
from datetime import datetime
from typing import List

import flyte
import flyte.report

env = flyte.TaskEnvironment(name="streaming_reports")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/streaming_reports.py*

Next we define the HTML content for the report:

```python
DATA_PROCESSING_DASHBOARD_HTML = """
...
"""
```

(We exclude it here due to length. You can find it in the [source file](
https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/streaming_reports.py)).

Finally, we define the task that renders the report (`data_processing_dashboard`), the driver task of the workflow (`main`), and the run logic:

```
@env.task(report=True)
async def data_processing_dashboard(total_records: int = 50000) -> str:
    """
    Simulates a data processing pipeline with real-time progress visualization.
    Updates every second for approximately 1 minute.
    """
    await flyte.report.log.aio(DATA_PROCESSING_DASHBOARD_HTML, do_flush=True)

    # Simulate data processing
    processed = 0
    errors = 0
    batch_sizes = [800, 850, 900, 950, 1000, 1050, 1100]  # Variable processing rates

    start_time = time.time()

    while processed < total_records:
        # Simulate variable processing speed
        batch_size = random.choice(batch_sizes)

        # Add some processing delays occasionally
        if random.random() < 0.1:  # 10% chance of slower batch
            batch_size = int(batch_size * 0.6)
            await flyte.report.log.aio("""
            <script>addActivity("⚠️ Detected slow processing batch, optimizing...");</script>
            """, do_flush=True)
        elif random.random() < 0.05:  # 5% chance of error
            errors += random.randint(1, 5)
            await flyte.report.log.aio("""
            <script>addActivity("❌ Processing errors detected, retrying failed records...");</script>
            """, do_flush=True)
        else:
            await flyte.report.log.aio(f"""
            <script>addActivity("✅ Successfully processed batch of {batch_size} records");</script>
            """, do_flush=True)

        processed = min(processed + batch_size, total_records)
        current_time = time.time()
        elapsed = current_time - start_time
        rate = int(batch_size) if elapsed < 1 else int(processed / elapsed)
        success_rate = ((processed - errors) / processed) * 100 if processed > 0 else 100

        # Update dashboard
        await flyte.report.log.aio(f"""
        <script>
            updateDashboard({processed}, {total_records}, {rate}, {success_rate});
        </script>
        """, do_flush=True)

        print(f"Processed {processed:,} records, Errors: {errors}, Rate: {rate:,}"
              f" records/sec, Success Rate: {success_rate:.2f}%", flush=True)
        await asyncio.sleep(1)  # Update every second

        if processed >= total_records:
            break

    # Final completion message
    total_time = time.time() - start_time
    avg_rate = int(total_records / total_time)

    await flyte.report.log.aio(f"""
    <script>addActivity("🎉 Processing completed successfully!");</script>
    <div style="background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; padding: 20px; border-radius: 8px; margin-top: 20px;">
        <h3>🎉 Processing Complete!</h3>
        <ul>
            <li><strong>Total Records:</strong> {total_records:,}</li>
            <li><strong>Processing Time:</strong> {total_time:.1f} seconds</li>
            <li><strong>Average Rate:</strong> {avg_rate:,} records/second</li>
            <li><strong>Success Rate:</strong> {success_rate:.2f}%</li>
            <li><strong>Errors Handled:</strong> {errors}</li>
        </ul>
    </div>
    """, do_flush=True)
    print(f"Data processing completed: {processed:,} records processed with {errors} errors.", flush=True)

    return f"Processed {total_records:,} records successfully"

@env.task
async def main():
    """
    Main task to run both reports.
    """
    await data_processing_dashboard(total_records=50000)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/reports/streaming_reports.py*

The key to the live update ability is the `while` loop that appends Javascript to the report. The Javascript calls execute on append to the document and update it.

When the workflow runs, you can see the report updating in real-time in the UI:

![Data Processing Dashboard](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/data_processing_dashboard.png)

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/notebooks ===

# Notebooks

Flyte is designed to work seamlessly with Jupyter notebooks, allowing you to write and execute workflows directly within a notebook environment.

## Iterating on and running a workflow

Download the following notebook file and open it in your favorite Jupyter environment: [interactive.ipynb](https://www.union.ai/docs/v2/union/_static/public/interactive.ipynb)

<!-- TODO: add back when working
📥 [interactive.ipynb](/_static/public/interactive.ipynb)
-->

In this example we have a simple workflow defined in our notebook.
You can iterate on the code in the notebook while running each cell in turn.

Note that the [`flyte.init()`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte/_index) call at the top of the notebook looks like this:

```python
flyte.init(
    endpoint="https://union.example.com",
    org="example_org",
    project="example_project",
    domain="development",
)
```

You will have to adjust it to match your Union server endpoint, organization, project, and domain.

## Accessing runs and downloading logs

Similarly, you can download the following notebook file and open it in your favorite Jupyter environment: [remote.ipynb](https://www.union.ai/docs/v2/union/_static/public/remote.ipynb)

<!-- TODO: add back when working
📥 [remote.ipynb](/_static/public/remote.ipynb)
-->

In this example we use the `flyte.remote` package to list existing runs, access them, and download their details and logs.

For a comprehensive guide on working with runs, actions, inputs, and outputs, see [Interact with runs and actions](https://www.union.ai/docs/v2/union/user-guide/task-deployment/interacting-with-runs).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/remote-tasks ===

# Remote tasks

Remote tasks let you use previously deployed tasks without importing their code or dependencies. This enables teams to share and reuse tasks without managing complex dependency chains or container images.

## Prerequisites

Remote tasks must be deployed before you can use them. See the [task deployment guide](../task-deployment/_index) for details.

## Basic usage

Use `flyte.remote.Task.get()` to reference a deployed task:

```python
import flyte
import flyte.remote

env = flyte.TaskEnvironment(name="my_env")

# Get the latest version of a deployed task
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

# Use it in your task
@env.task
async def my_task(data_path: str) -> flyte.io.DataFrame:
    # Call the reference task like any other task
    result = await data_processor(input_path=data_path)
    return result
```

You can run this directly without deploying it:

```bash
flyte run my_workflow.py my_task --data_path s3://my-bucket/data.parquet
```

## Understanding lazy loading

Remote tasks use **lazy loading** to keep module imports fast and enable flexible client configuration. When you call `flyte.remote.Task.get()`, it returns a lazy reference that doesn't actually fetch the task from the server until the first invocation.

### When tasks are fetched

The remote task is fetched from the server only when:

- You call `flyte.run()` with the task
- You call `flyte.deploy()` with code that uses the task
- You invoke the task with the `()` operator inside another task
- You explicitly call `.fetch()` on the lazy reference

```python
import flyte.remote

# This does NOT make a network call - returns a lazy reference
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

# The task is fetched here when you invoke it
run = flyte.run(data_processor, input_path="s3://my-bucket/data.parquet")
```

### Benefits of lazy loading

**Fast module loading**: Since no network calls are made during import, your Python modules load quickly even when referencing many remote tasks.

**Late binding**: You can call `flyte.init()` after importing remote tasks, and the correct client will be bound when the task is actually invoked:

```python
import flyte
import flyte.remote

# Load remote task reference at module level
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

# Initialize the client later
flyte.init_from_config()

# The task uses the client configured above
run = flyte.run(data_processor, input_path="s3://data.parquet")
```

### Error handling

Because of lazy loading, if a referenced task doesn't exist, you won't get an error when calling `get()`. Instead, the error occurs during invocation, raising a `flyte.errors.RemoteTaskNotFoundError`:

```python
import flyte
import flyte.remote
import flyte.errors

# This succeeds even if the task doesn't exist
data_processor = flyte.remote.Task.get(
    "nonexistent.task",
    auto_version="latest"
)

try:
    # Error occurs here during invocation
    run = flyte.run(data_processor, input_path="s3://data.parquet")
except flyte.errors.RemoteTaskNotFoundError as e:
    print(f"Task not found or invocation failed: {e}")
    # Handle the error - perhaps use a fallback task
    # or notify the user that the task needs to be deployed
```

You can also catch errors when using remote tasks within other tasks:

```python
import flyte.errors

@env.task
async def pipeline_with_fallback(data_path: str) -> dict:
    try:
        # Try to use the remote task
        result = await data_processor(input_path=data_path)
        return {"status": "success", "result": result}
    except flyte.errors.RemoteTaskNotFoundError as e:
        # Fallback to local processing
        print(f"Remote task failed: {e}, using local fallback")
        return {"status": "fallback", "result": local_process(data_path)}
    except flyte.errors.RemoteTaskUsageError as e:
        raise ValueError(f"Bad Usage of remote task, maybe arguments dont match!")
```

### Eager fetching with `fetch()`

While lazy loading is convenient, you can explicitly fetch a task upfront using the `fetch()` method. This is useful for:

- **Catching errors early**: Validate that the task exists before execution starts
- **Caching**: Avoid the network call on first invocation when running multiple times
- **Service initialization**: Pre-load tasks when your service starts

```python
import flyte
import flyte.remote
import flyte.errors

# Get the lazy reference
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

try:
    # Eagerly fetch the task details
    task_details = data_processor.fetch()

    # Now the task is cached - subsequent calls won't hit the remote service
    # You can pass either the original reference or task_details to flyte.run
    run1 = flyte.run(data_processor, input_path="s3://data1.parquet")
    run2 = flyte.run(task_details, input_path="s3://data2.parquet")

except flyte.errors.RemoteTaskNotFoundError as e:
    print(f"Task not found failed at startup: {e}")
    raise
except flyte.errors.RemoteTaskUsageError as e:
    print(f"Task run validation failed....")
    # Handle the error before any execution attempts
```

For async contexts, use `await fetch.aio()`:

```python
import flyte.remote

async def initialize_service():
    processor_ref = flyte.remote.Task.get(
        "data_team.spark_analyzer",
        auto_version="latest"
    )

    try:
        # Fetch asynchronously
        task_details = await processor_ref.fetch.aio()
        print(f"Task {task_details.name} loaded successfully")
        return processor_ref  # Return the cached reference
    except flyte.errors.RemoteTaskNotFoundError as e:
        print(f"Failed to load task: {e}")
        raise

# Initialize once at service startup
cached_processor = None

async def startup():
    global cached_processor
    cached_processor = await initialize_service()

# Later in your service
async def process_request(data_path: str):
    # The task is already cached from initialization
    # No network call on first invocation
    run = flyte.run(cached_processor, input_path=data_path)
    return run
```

**When to use eager fetching**:

- **Service startup**: Fetch all remote tasks during initialization to validate they exist and cache them
- **Multiple invocations**: If you'll invoke the same task many times, fetch once to cache it
- **Fail-fast validation**: Catch configuration errors before execution begins

**When lazy loading is better**:

- **Single-use tasks**: If you only invoke the task once, lazy loading is simpler
- **Import-time overhead**: Keep imports fast by deferring network calls
- **Conditional usage**: If the task may not be needed, don't fetch it upfront

### Module-level vs dynamic loading

**Module-level loading (recommended)**: Load remote tasks at the module level for cleaner, more maintainable code:

```python
import flyte.remote

# Module-level - clear and maintainable
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

@env.task
async def my_task(data_path: str):
    return await data_processor(input_path=data_path)
```

**Dynamic loading**: You can also load remote tasks dynamically within a task if needed:

```python
@env.task
async def dynamic_pipeline(task_name: str, data_path: str):
    # Load the task based on runtime parameters
    processor = flyte.remote.Task.get(
        f"data_team.{task_name}",
        auto_version="latest"
    )

    try:
        result = await processor(input_path=data_path)
        return result
    except flyte.errors.RemoteTaskNotFoundError as e:
        raise ValueError(f"Task {task_name} not found: {e}")
```

## Complete example

This example shows how different teams can collaborate using remote tasks.

### Team A: Spark environment

Team A maintains Spark-based data processing tasks:

```python
# spark_env.py
from dataclasses import dataclass
import flyte

env = flyte.TaskEnvironment(name="spark_env")

@dataclass
class AnalysisResult:
    mean_value: float
    std_dev: float

@env.task
async def analyze_data(data_path: str) -> AnalysisResult:
    # Spark code here (not shown)
    return AnalysisResult(mean_value=42.5, std_dev=3.2)

@env.task
async def compute_score(result: AnalysisResult) -> float:
    # More Spark processing
    return result.mean_value / result.std_dev
```

Deploy the Spark environment:

```bash
flyte deploy spark_env/
```

### Team B: ML environment

Team B maintains PyTorch-based ML tasks:

```python
# ml_env.py
from pydantic import BaseModel
import flyte

env = flyte.TaskEnvironment(name="ml_env")

class PredictionRequest(BaseModel):
    feature_x: float
    feature_y: float

class Prediction(BaseModel):
    score: float
    confidence: float
    model_version: str

@env.task
async def run_inference(request: PredictionRequest) -> Prediction:
    # PyTorch model inference (not shown)
    return Prediction(
        score=request.feature_x * 2.5,
        confidence=0.95,
        model_version="v2.1"
    )
```

Deploy the ML environment:

```bash
flyte deploy ml_env/
```

### Team C: Orchestration

Team C builds a workflow using remote tasks from both teams without needing Spark or PyTorch dependencies:

```python
# orchestration_env.py
import flyte.remote

env = flyte.TaskEnvironment(name="orchestration")

# Reference tasks from other teams
analyze_data = flyte.remote.Task.get(
    "spark_env.analyze_data",
    auto_version="latest"
)

compute_score = flyte.remote.Task.get(
    "spark_env.compute_score",
    auto_version="latest"
)

run_inference = flyte.remote.Task.get(
    "ml_env.run_inference",
    auto_version="latest"
)

@env.task
async def orchestrate_pipeline(data_path: str) -> float:
    # Use Spark tasks without Spark dependencies
    analysis = await analyze_data(data_path=data_path)

    # Access attributes from the result
    # (Flyte creates a fake type that allows attribute access)
    print(f"Analysis: mean={analysis.mean_value}, std={analysis.std_dev}")

    data_score = await compute_score(result=analysis)

    # Use ML task without PyTorch dependencies
    # Pass Pydantic models as dictionaries
    prediction = await run_inference(
        request={
            "feature_x": analysis.mean_value,
            "feature_y": data_score
        }
    )

    # Access Pydantic model attributes
    print(f"Prediction: {prediction.score} (confidence: {prediction.confidence})")

    return prediction.score
```

Run the orchestration task directly (no deployment needed):

**Using Python API**:
```python
if __name__ == "__main__":
    flyte.init_from_config()

    run = flyte.run(
        orchestrate_pipeline,
        data_path="s3://my-bucket/data.parquet"
    )

    print(f"Execution URL: {run.url}")
    # You can wait for the execution
    run.wait()

    # You can then retrieve the outputs
    print(f"Pipeline result: {run.outputs()}")
```

**Using CLI**:
```bash
flyte run orchestration_env.py orchestrate_pipeline --data_path s3://my-bucket/data.parquet
```

## Invoke remote tasks in a script.

You can also run any remote task directly using a script in a similar way
```python
import flyte
import flyte.models
import flyte.remote

flyte.init_from_config()

# Fetch the task
remote_task = flyte.remote.Task.get("package-example.calculate_average", auto_version="latest")

# Create a run, note keyword arguments are required currently. In the future this will accept positional args based on the declaration order, but, we still recommend to use keyword args.
run = flyte.run(remote_task, numbers=[1.0, 2.0, 3.0])

print(f"Execution URL: {run.url}")
# you can view the phase

print(f"Current Phase: {run.phase}")
# You can wait for the execution
run.wait()

# Only available after flyte >= 2.0.0b39
print(f"Current phase: {run.phase}")

# Phases can be compared to
if run.phase == flyte.models.ActionPhase.SUCCEEDED:
    print(f"Run completed!")

# You can then retrieve the outputs
print(f"Pipeline result: {run.outputs()}")
```

## Why use remote tasks?

Remote tasks solve common collaboration and dependency management challenges:

**Cross-team collaboration**: Team A has deployed a Spark task that analyzes large datasets. Team B needs this analysis for their ML pipeline but doesn't want to learn Spark internals, install Spark dependencies, or build Spark-enabled container images. With remote tasks, Team B simply references Team A's deployed task.

**Platform reusability**: Platform teams can create common, reusable tasks (data validation, feature engineering, model serving) that other teams can use without duplicating code or managing complex dependencies.

**Microservices for data workflows**: Remote tasks work like microservices for long-running tasks or agents, enabling secure sharing while maintaining isolation.

## When to use remote tasks

Use remote tasks when you need to:

- Use functionality from another team without their dependencies
- Share common tasks across your organization
- Build reusable platform components
- Avoid dependency conflicts between different parts of your workflow
- Create modular, maintainable data pipelines

## How remote tasks work

### Security model

Remote tasks run in the **caller's project and domain** using the caller's compute resources, but execute with the **callee's service accounts, IAM roles, and secrets**. This ensures:

- Tasks are secure from misuse
- Resource usage is properly attributed
- Authentication and authorization are maintained
- Collaboration remains safe and controlled

### Type system

Remote tasks use Flyte's default types as inputs and outputs. Flyte's type system seamlessly translates data between tasks without requiring the original dependencies:

| Remote Task Type | Flyte Type |
|-------------------|------------|
| DataFrames (`pandas`, `polars`, `spark`, etc.) | `flyte.io.DataFrame` |
| Object store files | `flyte.io.File` |
| Object store directories | `flyte.io.Dir` |
| Pydantic models | Dictionary (Flyte creates a representation) |

Any DataFrame type (pandas, polars, spark) automatically becomes `flyte.io.DataFrame`, allowing seamless data exchange between tasks using different DataFrame libraries. You can also write custom integrations or explore Flyte's plugin system for additional types.

For Pydantic models specifically, you don't need the exact model locally. Pass a dictionary as input, and Flyte will handle the translation.

## Versioning options

Reference tasks support flexible versioning:

**Specific version**:

```python
task = flyte.remote.Task.get(
    "team_a.process_data",
    version="v1.2.3"
)
```

**Latest version** (`auto_version="latest"`):

```python
# Always use the most recently deployed version
task = flyte.remote.Task.get(
    "team_a.process_data",
    auto_version="latest"
)
```

**Current version** (`auto_version="current"`):

```python
# Use the same version as the calling task's deployment
# Useful when all environments deploy with the same version
# Can only be used from within a task context
task = flyte.remote.Task.get(
    "team_a.process_data",
    auto_version="current"
)
```

## Customizing remote tasks

Remote tasks can be customized by overriding various properties without modifying the original deployed task. This allows you to adjust resource requirements, retry strategies, caching behavior, and more based on your specific use case.

### Available overrides

The `override()` method on remote tasks accepts the following parameters:

- **short_name** (`str`): A short name for the task instance
- **resources** (`flyte.Resources`): CPU, memory, GPU, and storage limits
- **retries** (`int | flyte.RetryStrategy`): Number of retries or retry strategy
- **timeout** (`flyte.TimeoutType`): Task execution timeout
- **env_vars** (`Dict[str, str]`): Environment variables to set
- **secrets** (`flyte.SecretRequest`): Secrets to inject
- **max_inline_io_bytes** (`int`): Maximum size for inline IO in bytes
- **cache** (`flyte.Cache`): Cache behavior and settings
- **queue** (`str`): Execution queue to use

### Override examples

**Increase resources for a specific use case**:

```python
import flyte.remote

# Get the base task
data_processor = flyte.remote.Task.get(
    "data_team.spark_analyzer",
    auto_version="latest"
)

# Override with more resources for large dataset processing
large_data_processor = data_processor.override(
    resources=flyte.Resources(
        cpu="16",
        memory="64Gi",
        storage="200Gi"
    )
)

@env.task
async def process_large_dataset(data_path: str):
    # Use the customized version
    return await large_data_processor(input_path=data_path)
```

**Add retries and timeout**:

```python
# Override with retries and timeout for unreliable operations
reliable_processor = data_processor.override(
    retries=3,
    timeout="2h"
)

@env.task
async def robust_pipeline(data_path: str):
    return await reliable_processor(input_path=data_path)
```

**Configure caching**:

```python
# Override cache settings
cached_processor = data_processor.override(
    cache=flyte.Cache(
        behavior="override",
        version_override="v2",
        serialize=True
    )
)
```

**Set environment variables and secrets**:

```python
# Override with custom environment and secrets
custom_processor = data_processor.override(
    env_vars={
        "LOG_LEVEL": "DEBUG",
        "REGION": "us-west-2"
    },
    secrets=flyte.SecretRequest(
        secrets={"api_key": "my-secret-key"}
    )
)
```

**Multiple overrides**:

```python
# Combine multiple overrides
production_processor = data_processor.override(
    short_name="prod_spark_analyzer",
    resources=flyte.Resources(cpu="8", memory="32Gi"),
    retries=5,
    timeout="4h",
    env_vars={"ENV": "production"},
    queue="high-priority"
)

@env.task
async def production_pipeline(data_path: str):
    return await production_processor(input_path=data_path)
```

### Chain overrides

You can chain multiple `override()` calls to incrementally adjust settings:

```python
# Start with base task
processor = flyte.remote.Task.get("data_team.analyzer", auto_version="latest")

# Add resources
processor = processor.override(resources=flyte.Resources(cpu="4", memory="16Gi"))

# Add retries for production
if is_production:
    processor = processor.override(retries=5, timeout="2h")

# Use the customized task
result = await processor(input_path="s3://data.parquet")
```

## Best practices

### 1. Use meaningful task names

Remote tasks are accessed by name, so use clear, descriptive naming:

```python
# Good
customer_segmentation = flyte.remote.Task.get("ml_platform.customer_segmentation")

# Avoid
task1 = flyte.remote.Task.get("team_a.task1")
```

### 2. Document task interfaces

Since remote tasks abstract away implementation details, clear documentation of inputs, outputs, and behavior is essential:

```python
@env.task
async def process_customer_data(
    customer_ids: list[str],
    date_range: tuple[str, str]
) -> flyte.io.DataFrame:
    """
    Process customer data for the specified date range.

    Args:
        customer_ids: List of customer IDs to process
        date_range: Tuple of (start_date, end_date) in YYYY-MM-DD format

    Returns:
        DataFrame with processed customer features
    """
    ...
```

### 3. Prefer module-level loading

Load remote tasks at the module level rather than inside functions for cleaner code:

```python
import flyte.remote

# Good - module level
data_processor = flyte.remote.Task.get("team.processor", auto_version="latest")

@env.task
async def my_task(data: str):
    return await data_processor(input=data)
```

This approach:
- Makes dependencies clear and discoverable
- Reduces code duplication
- Works well with lazy loading (no performance penalty)

Dynamic loading within tasks is also supported when you need runtime flexibility.

### 4. Handle versioning thoughtfully

- Use `auto_version="latest"` during development for rapid iteration
- Use specific versions in production for stability and reproducibility
- Use `auto_version="current"` when coordinating multienvironment deployments

### 5. Deploy remote tasks first

Always deploy the remote tasks before using them. Tasks that reference them can be run directly without deployment:

Deploy the remote task environments first:

```bash
flyte deploy spark_env/
flyte deploy ml_env/
```

Then run the orchestration task directly (no deployment needed):

```bash
flyte run orchestration_env.py orchestrate_pipeline
```

If you want to deploy the orchestration task as well (for scheduled runs or to be referenced by other tasks), deploy it after its dependencies:

```bash
flyte deploy orchestration_env/
```

## Limitations

1. **Lazy error detection**: Because of lazy loading, errors about missing or invalid tasks only occur during invocation, not when calling `get()`. You'll receive a `flyte.errors.RemoteTaskNotFoundError` if the task doesn't exist and `flyte.errors.RemoteTaskUsageError` if it can't be invoked in the way you are passing either arguments or overrides.

2. **Type fidelity**: While Flyte translates types seamlessly, you work with Flyte's representation of Pydantic models, not the exact original types

3. **Deployment order**: Referenced tasks must be deployed before tasks that reference them can be invoked

4. **Context requirement**: Using `auto_version="current"` requires running within a task context

5. **Dictionary inputs**: Pydantic models must be passed as dictionaries, which loses compile-time type checking

6. **No positional arguments**: Remote tasks currently only support keyword arguments (this may change in future versions)

## Next steps

- Learn about [task deployment](../task-deployment/_index)
- Explore [task environments and configuration](../task-configuration/_index)

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/error-handling ===

# Error handling

One of the key features of Flyte 2 is the ability to recover from user-level errors in a workflow execution.
This includes out-of-memory errors and other exceptions.

In a distributed system with heterogeneous compute, certain types of errors are expected and even, in a sense, acceptable.
Flyte 2 recognizes this and allows you to handle them gracefully as part of your workflow logic.

This ability is a direct result of the fact that workflows are now written in regular Python,
giving you with all the power and flexibility of Python error handling.
Let's look at an example:

```python
# /// script
# requires-python = "==3.13"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# main = "main"
# params = ""
# ///

import asyncio

import flyte
import flyte.errors

env = flyte.TaskEnvironment(name="fail", resources=flyte.Resources(cpu=1, memory="250Mi"))

@env.task
async def oomer(x: int):
    large_list = [0] * 100000000
    print(len(large_list))

@env.task
async def always_succeeds() -> int:
    await asyncio.sleep(1)
    return 42

@env.task
async def main() -> int:
    try:
        await oomer(2)
    except flyte.errors.OOMError as e:
        print(f"Failed with oom trying with more resources: {e}, of type {type(e)}, {e.code}")
        try:
            await oomer.override(resources=flyte.Resources(cpu=1, memory="1Gi"))(5)
        except flyte.errors.OOMError as e:
            print(f"Failed with OOM Again giving up: {e}, of type {type(e)}, {e.code}")
            raise e
    finally:
        await always_succeeds()

    return await always_succeeds()

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/error-handling/error_handling.py*
<!-- TODO:
Create better example.
-->

In this code, we do the following:

* Import the necessary modules
* Set up the task environment. Note that we define our task environment with a resource allocation of 1 CPU and 250 MiB of memory.
* Define two tasks: one that will intentionally cause an out-of-memory (OOM) error, and another that will always succeed.
* Define the main task (the top level workflow task) that will handle the failure recovery logic.

The top `try...catch` block attempts to run the `oomer` task with a parameter that is likely to cause an OOM error.
If the error occurs, it catches the [`flyte.errors.OOMError`](https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.errors/oomerror) and attempts to run the `oomer` task again with increased resources.

This type of dynamic error handling allows you to gracefully recover from user-level errors in your workflows.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/traces ===

# Traces

The `@flyte.trace` decorator provides fine-grained observability and resumption capabilities for functions called within your Flyte workflows.
Traces are used on **helper functions** that tasks call to perform specific operations like API calls, data processing, or computations.
Traces are particularly useful for [managing the challenges of non-deterministic behavior in workflows](https://www.union.ai/docs/v2/union/user-guide/flyte-2/considerations), allowing you to track execution details and resume from failures.

## What are traced functions for?

At the top level, Flyte workflows are composed of **tasks**. But it is also common practice to break down complex task logic into smaller, reusable functions by defining helper functions that tasks call to perform specific operations.

Any helper functions defined or imported into the same file as a task definition are automatically uploaded to the Flyte environment alongside the task when it is deployed.

At the task level, observability and resumption of failed executions is provided by caching, but what if you want these capabilities at a more granular level, for the individual operations that tasks perform?

This is where **traced functions** come in. By decorating helper functions with `@flyte.trace`, you enable:
- **Detailed observability**: Track execution time, inputs/outputs, and errors for each function call.
- **Fine-grained resumption**: If a workflow fails, resume from the last successful traced function instead of re-running the entire task.
Each traced function is effectively a checkpoint within its task.

Here is an example:

```
import asyncio

import flyte

env = flyte.TaskEnvironment("env")

@flyte.trace
async def call_llm(prompt: str) -> str:
    await asyncio.sleep(0.1)
    return f"LLM response for: {prompt}"

@flyte.trace
async def process_data(data: str) -> dict:
    await asyncio.sleep(0.2)
    return {"processed": data, "status": "completed"}

@env.task
async def research_workflow(topic: str) -> dict:
    llm_result = await call_llm(f"Generate research plan for: {topic}")
    processed_data = await process_data(llm_result)
    return {"topic": topic, "result": processed_data}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/traces/task_vs_trace.py*

## What Gets Traced

Traces capture detailed execution information:
- **Execution time**: How long each function call takes.
- **Inputs and outputs**: Function parameters and return values.
- **Checkpoints**: State that enables workflow resumption.

### Errors are not recorded

Only successful trace executions are recorded in the checkpoint system. When a traced function fails, the exception propagates up to your task code where you can handle it with standard error handling patterns.

### Supported Function Types

The trace decorator works with:
- **Asynchronous functions**: Functions defined with `async def`.
- **Generator functions**: Functions that `yield` values.
- **Async generators**: Functions that `async yield` values.

> [!NOTE]
> Currently tracing only works for asynchronous functions. Tracing of synchronous functions is coming soon.

```
@flyte.trace
async def async_api_call(topic: str) -> dict:
    # Asynchronous API call
    await asyncio.sleep(0.1)
    return {"data": ["item1", "item2", "item3"], "status": "success"}

@flyte.trace
async def stream_data(items: list[str]):
    # Async generator function for streaming
    for item in items:
        await asyncio.sleep(0.02)
        yield f"Processing: {item}"

@flyte.trace
async def async_stream_llm(prompt: str):
    # Async generator for streaming LLM responses
    chunks = ["Research shows", " that machine learning", " continues to evolve."]
    for chunk in chunks:
        await asyncio.sleep(0.05)
        yield chunk

@env.task
async def research_workflow(topic: str) -> dict:
    llm_result = await async_api_call(topic)

    # Collect async generator results
    processed_data = []
    async for item in stream_data(llm_result["data"]):
        processed_data.append(item)

    llm_stream = []
    async for chunk in async_stream_llm(f"Summarize research on {topic}"):
        llm_stream.append(chunk)

    return {
        "topic": topic,
        "processed_data": processed_data,
        "llm_summary": "".join(llm_stream)
    }
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/traces/function_types.py*

## Task Orchestration Pattern

The typical Flyte workflow follows this pattern:

```
@flyte.trace
async def search_web(query: str) -> list[dict]:
    # Search the web and return results
    await asyncio.sleep(0.1)
    return [{"title": f"Article about {query}", "content": f"Content on {query}"}]

@flyte.trace
async def summarize_content(content: str) -> str:
    # Summarize content using LLM
    await asyncio.sleep(0.1)
    return f"Summary of {len(content.split())} words"

@flyte.trace
async def extract_insights(summaries: list[str]) -> dict:
    # Extract insights from summaries
    await asyncio.sleep(0.1)
    return {"insights": ["key theme 1", "key theme 2"], "count": len(summaries)}

@env.task
async def research_pipeline(topic: str) -> dict:
    # Each helper function creates a checkpoint
    search_results = await search_web(f"research on {topic}")

    summaries = []
    for result in search_results:
        summary = await summarize_content(result["content"])
        summaries.append(summary)

    final_insights = await extract_insights(summaries)

    return {
        "topic": topic,
        "insights": final_insights,
        "sources_count": len(search_results)
    }
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/traces/pattern.py*

**Benefits of this pattern:**
- If `search_web` succeeds but `summarize_content` fails, resumption skips the search step
- Each operation is independently observable and debuggable
- Clear separation between workflow coordination (task) and execution (traced functions)

## Relationship to Caching and Checkpointing

Understanding how traces work with Flyte's other execution features:

| Feature | Scope | Purpose | Default Behavior |
|---------|-------|---------|------------------|
| **Task Caching** | Entire task execution (`@env.task`) | Skip re-running tasks with same inputs | Enabled (`cache="auto"`) |
| **Traces** | Individual helper functions | Observability and fine-grained resumption | Manual (requires `@flyte.trace`) |
| **Checkpointing** | Workflow state | Resume workflows from failure points | Automatic when traces are used |

### How They Work Together

<!-- TODO
Lets use better typing for all of these examples, we have the opportunity to make this right for our users
-->

```
@flyte.trace
async def traced_data_cleaning(dataset_id: str) -> List[str]:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.2)
    return [f"cleaned_record_{i}_{dataset_id}" for i in range(100)]

@flyte.trace
async def traced_feature_extraction(data: List[str]) -> dict:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.3)
    return {
        "features": [f"feature_{i}" for i in range(10)],
        "feature_count": len(data),
        "processed_samples": len(data)
    }

@flyte.trace
async def traced_model_training(features: dict) -> dict:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.4)
    sample_count = features["processed_samples"]
    # Mock accuracy based on sample count
    accuracy = min(0.95, 0.7 + (sample_count / 1000))
    return {
        "accuracy": accuracy,
        "epochs": 50,
        "model_size": "125MB"
    }

@env.task(cache="auto")  # Task-level caching enabled
async def data_pipeline(dataset_id: str) -> dict:
    # 1. If this exact task with these inputs ran before,
    #    the entire task result is returned from cache

    # 2. If not cached, execution begins and each traced function
    #    creates checkpoints for resumption
    cleaned_data = await traced_data_cleaning(dataset_id)      # Checkpoint 1
    features = await traced_feature_extraction(cleaned_data)   # Checkpoint 2
    model_results = await traced_model_training(features)      # Checkpoint 3

    # 3. If workflow fails at step 3, resumption will:
    #    - Skip traced_data_cleaning (checkpointed)
    #    - Skip traced_feature_extraction (checkpointed)
    #    - Re-run only traced_model_training

    return {"dataset_id": dataset_id, "accuracy": model_results["accuracy"]}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/traces/caching_vs_checkpointing.py*

### Execution Flow

1. **Task Submission**: Task is submitted with input parameters
2. **Cache Check**: Flyte checks if identical task execution exists in cache
3. **Cache Hit**: If cached, return cached result immediately (no traces needed)
4. **Cache Miss**: Begin fresh execution
5. **Trace Checkpoints**: Each `@flyte.trace` function creates resumption points
6. **Failure Recovery**: If workflow fails, resume from last successful checkpoint
7. **Task Completion**: Final result is cached for future identical inputs

<!--
Clarify what actually happens on error vs success with traces

## Error Handling and Observability

Traces capture comprehensive execution information for debugging and monitoring:

```
@flyte.trace
async def risky_api_call(endpoint: str, data: dict) -> dict:
    """API call that might fail - traces capture errors."""
    try:
        response = await api_client.post(endpoint, json=data)
        return response.json()
    except Exception as e:
        # Error is automatically captured in trace
        logger.error(f"API call failed: {e}")
        raise

@env.task
async def error_handling() -> dict:
    try:
        result = await risky_api_call("/process", {"invalid": "data"})
        return {"status": "success", "result": result}
    except Exception as e:
        # The error is recorded in the trace for debugging
        return {"status": "error", "message": str(e)}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/traces/error_handling.py*

**What traces capture:**
- **Execution time**: Duration of each function call
- **Inputs and outputs**: Function parameters and return values
- **Checkpoints**: State that enables workflow resumption from successful executions
- **Action IDs**: Unique identifiers for each execution

**Error handling:**
- Errors from traced functions are not recorded in checkpoints
- Exceptions propagate to your task code for standard error handling
- The error_handling example shows how to catch and handle these exceptions in your task

TODO:
Ketan Umare:
we should show an example where tasks and traces can be used interchangeably

## Examples in Practice

### LLM Pipeline with Traces

```python
import flyte

env = flyte.TaskEnvironment("llm-pipeline")

@flyte.trace
async def call_llm(prompt: str, model: str = "gpt-4") -> str:
    """Call LLM with specified model."""
    response = await llm_client.chat(prompt, model=model)
    return response

@flyte.trace
async def extract_entities(text: str) -> list[str]:
    """Extract named entities from text."""
    entities = await nlp_service.extract_entities(text)
    return entities

@env.task
async def process_documents(documents: list[str]) -> dict:
    """Process multiple documents through LLM pipeline."""
    results = []

    for doc in documents:
        # Each call is traced for monitoring and resumption
        summary = await call_llm(f"Summarize: {doc}")
        entities = await extract_entities(summary)

        results.append({
            "document": doc,
            "summary": summary,
            "entities": entities
        })

    return {"processed_documents": results, "total_count": len(results)}
```

This comprehensive tracing system provides visibility into your workflow execution while enabling robust error recovery and resumption capabilities.
-->

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/grouping-actions ===

# Grouping actions

Groups are an organizational feature in Flyte that allow you to logically cluster related task invocations (called "actions") for better visualization and management in the UI.
Groups help you organize task executions into manageable, hierarchical structures regardless of whether you're working with large fanouts or smaller, logically-related sets of operations.

## What are groups?

Groups provide a way to organize task invocations into logical units in the Flyte UI.
When you have multiple task executions—whether from large [fanouts](./fanout), sequential operations, or any combination of tasks—groups help organize them into manageable units.

### The problem groups solve

Without groups, complex workflows can become visually overwhelming in the Flyte UI:
- Multiple task executions appear as separate nodes, making it hard to see the high-level structure
- Related operations are scattered throughout the workflow graph
- Debugging and monitoring becomes difficult when dealing with many individual task executions

Groups solve this by:
- **Organizing actions**: Multiple task executions within a group are presented as a hierarchical "folder" structure
- **Improving UI visualization**: Instead of many individual nodes cluttering the view, you see logical groups that can be collapsed or expanded
- **Aggregating status information**: Groups show aggregated run status (success/failure) of their contained actions when you hover over them in the UI
- **Maintaining execution parallelism**: Tasks still run concurrently as normal, but are organized for display

### How groups work

Groups are declared using the [`flyte.group`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte/_index) context manager.
Any task invocations that occur within the `with flyte.group()` block are automatically associated with that group:

```python
with flyte.group("my-group-name"):
    # All task invocations here belong to "my-group-name"
    result1 = await task_a(data)
    result2 = await task_b(data)
    result3 = await task_c(data)
```

The key points about groups:

1. **Context-based**: Use the `with flyte.group("name"):` context manager.
2. **Organizational tool**: Task invocations within the context are grouped together in the UI.
3. **UI folders**: Groups appear as collapsible/expandable folders in the Flyte UI run tree.
4. **Status aggregation**: Hover over a group in the UI to see aggregated success/failure information.
5. **Execution unchanged**: Tasks still execute in parallel as normal; groups only affect organization and visualization.

**Important**: Groups do not aggregate outputs. Each task execution still produces its own individual outputs. Groups are purely for organization and UI presentation.

## Common grouping patterns

### Sequential operations

Group related sequential operations that logically belong together:

```
@env.task
async def data_pipeline(raw_data: str) -> str:
    with flyte.group("data-validation"):
        validated_data = await process_data(raw_data, "validate_schema")
        validated_data = await process_data(validated_data, "check_quality")
        validated_data = await process_data(validated_data, "remove_duplicates")

    with flyte.group("feature-engineering"):
        features = await process_data(validated_data, "extract_features")
        features = await process_data(features, "normalize_features")
        features = await process_data(features, "select_features")

    with flyte.group("model-training"):
        model = await process_data(features, "train_model")
        model = await process_data(model, "validate_model")
        final_model = await process_data(model, "save_model")

    return final_model
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/grouping-actions/grouping.py*

### Parallel processing with groups

Groups work well with parallel execution patterns:

```
@env.task
async def parallel_processing_example(n: int) -> str:
    tasks = []

    with flyte.group("parallel-processing"):
        # Collect all task invocations first
        for i in range(n):
            tasks.append(process_item(i, "transform"))

        # Execute all tasks in parallel
        results = await asyncio.gather(*tasks)

    # Convert to string for consistent return type
    return f"parallel_results: {results}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/grouping-actions/grouping.py*

### Multi-phase workflows

Use groups to organize different phases of complex workflows:

```
@env.task
async def multi_phase_workflow(data_size: int) -> str:
    # First phase: data preprocessing
    preprocessed = []
    with flyte.group("preprocessing"):
        for i in range(data_size):
            preprocessed.append(process_item(i, "preprocess"))
        phase1_results = await asyncio.gather(*preprocessed)

    # Second phase: main processing
    processed = []
    with flyte.group("main-processing"):
        for result in phase1_results:
            processed.append(process_item(result, "transform"))
        phase2_results = await asyncio.gather(*processed)

    # Third phase: postprocessing
    postprocessed = []
    with flyte.group("postprocessing"):
        for result in phase2_results:
            postprocessed.append(process_item(result, "postprocess"))
        final_results = await asyncio.gather(*postprocessed)

    # Convert to string for consistent return type
    return f"multi_phase_results: {final_results}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/grouping-actions/grouping.py*

### Nested groups

Groups can be nested to create hierarchical organization:

```
@env.task
async def hierarchical_example(raw_data: str) -> str:
    with flyte.group("data-preparation"):
        cleaned_data = await process_data(raw_data, "clean_data")
        split_data = await process_data(cleaned_data, "split_dataset")

    with flyte.group("hyperparameter-tuning"):
        best_params = await process_data(split_data, "tune_hyperparameters")

    with flyte.group("model-training"):
        model = await process_data(best_params, "train_final_model")
    return model
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/grouping-actions/grouping.py*

### Conditional grouping

Groups can be used with conditional logic:

```
@env.task
async def conditional_processing(use_advanced_features: bool, input_data: str) -> str:
    base_result = await process_data(input_data, "basic_processing")

    if use_advanced_features:
        with flyte.group("advanced-features"):
            enhanced_result = await process_data(base_result, "advanced_processing")
            optimized_result = await process_data(enhanced_result, "optimize_result")
            return optimized_result
    else:
        with flyte.group("basic-features"):
            simple_result = await process_data(base_result, "simple_processing")
            return simple_result
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/grouping-actions/grouping.py*

## Key insights

Groups are primarily an organizational and UI visualization tool—they don't change how your tasks execute or aggregate their outputs, but they help organize related task invocations (actions) into collapsible folder-like structures for better workflow management and display. The aggregated status information (success/failure rates) is visible when hovering over group folders in the UI.

Groups make your Flyte workflows more maintainable and easier to understand, especially when working with complex workflows that involve multiple logical phases or large numbers of task executions. They serve as organizational "folders" in the UI's call stack tree, allowing you to collapse sections to reduce visual distraction while still seeing aggregated status information on hover.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/fanout ===

# Fanout

Flyte is designed to scale effortlessly, allowing you to run workflows with large fanouts.
When you need to execute many tasks in parallel—such as processing a large dataset or running hyperparameter sweeps—Flyte provides powerful patterns to implement these operations efficiently.

> [!NOTE]
> By default fanouts in Union are limited to a maximum size.
> Adjustment can made to this maximum by consulting with the Union team.
> Full documentation of this aspect of fanout is coming soon.

## Understanding fanout

A "fanout" pattern occurs when you spawn multiple tasks concurrently.
Each task runs in its own container and contributes an output that you later collect.
The most common way to implement this is using the [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) function.

In Flyte terminology, each individual task execution is called an "action"—this represents a specific invocation of a task with particular inputs. When you call a task multiple times in a loop, you create multiple actions.

## Example

We start by importing our required packages, defining our Flyte environment, and creating a simple task that fetches user data from a mock API.

```
import asyncio
from typing import List, Tuple

import flyte

env = flyte.TaskEnvironment("fanout_env")

@env.task
async def fetch_data(user_id: int) -> dict:
    """Simulate fetching user data from an API - good for parallel execution."""
    # Simulate network I/O delay
    await asyncio.sleep(0.1)
    return {
        "user_id": user_id,
        "name": f"User_{user_id}",
        "score": user_id * 10,
        "data": f"fetched_data_{user_id}"
    }
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/fanout/fanout.py*

### Parallel execution

Next we implement the most common fanout pattern, which is to collect task invocations and execute them in parallel using `asyncio.gather()`:

```
@env.task
async def parallel_data_fetching(user_ids: List[int]) -> List[dict]:
    """Fetch data for multiple users in parallel - ideal for I/O bound operations."""
    tasks = []

    # Collect all fetch tasks - these can run in parallel since they're independent
    for user_id in user_ids:
        tasks.append(fetch_data(user_id))

    # Execute all fetch operations in parallel
    results = await asyncio.gather(*tasks)
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/fanout/fanout.py*

### Running the example

To actually run our example, we create a main guard that intializes Flyte and runs our main driver task:

```
if __name__ == "__main__":
    flyte.init_from_config()
    user_ids = [1, 2, 3, 4, 5]
    r = flyte.run(parallel_data_fetching, user_ids)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/fanout/fanout.py*

## How Flyte handles concurrency and parallelism

In the example we use a standard `asyncio.gather()` pattern.
When this pattern is used in a normal Python environment, the tasks would execute **concurrently** (cooperatively sharing a single thread through the event loop), but not in true **parallel** (multiple CPU cores simultaneously).

However, **Flyte transforms this concurrency model into true parallelism**. When you use `asyncio.gather()` in a Flyte task:

1. **Flyte acts as a distributed event loop**: Instead of scheduling coroutines on a single machine, Flyte schedules each task action to run in its own container across the cluster
2. **Concurrent becomes parallel**: What would be cooperative multitasking in regular Python becomes true parallel execution across multiple machines
3. **Native Python patterns**: You use familiar `asyncio` patterns, but Flyte automatically distributes the work

This means that when you write:
```python
results = await asyncio.gather(fetch_data(1), fetch_data(2), fetch_data(3))
```

Instead of three coroutines sharing one CPU, you get three separate containers running simultaneously, each with their own CPU, memory, and resources. Flyte seamlessly bridges the gap between Python's concurrency model and distributed parallel computing, allowing for massive scalability while maintaining the familiar async/await programming model.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/controlling-parallelism ===

# Controlling parallel execution

When you [fan out](./fanout) to many tasks, you often need to limit how many run at the same time.
Common reasons include rate-limited APIs, GPU quotas, database connection limits, or simply avoiding overwhelming a downstream service.

Flyte 2 provides two ways to control concurrency:
[`asyncio.Semaphore`](https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore) for fine-grained control,
and `flyte.map` with a built-in `concurrency` parameter for simpler cases.

## The problem: unbounded parallelism

A straightforward `asyncio.gather` launches every task at once.
If you are calling an external API that allows only a few concurrent requests, this can cause throttling or errors:

```
import asyncio

import flyte

env = flyte.TaskEnvironment("controlling_parallelism")

@env.task
async def call_llm_api(prompt: str) -> str:
    """Simulate calling a rate-limited LLM API."""
    # In a real workflow, this would call an external API.
    # The API might allow only a few concurrent requests.
    await asyncio.sleep(0.5)
    return f"Response to: {prompt}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

```
@env.task
async def process_all_at_once(prompts: list[str]) -> list[str]:
    """Send all requests in parallel with no concurrency limit.

    This can overwhelm a rate-limited API, causing errors or throttling.
    """
    results = await asyncio.gather(*[call_llm_api(p) for p in prompts])
    return list(results)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

With eight prompts, this fires eight concurrent API calls.
That works fine when there are no limits, but will fail when the API enforces a concurrency cap.

## Using asyncio.Semaphore

An `asyncio.Semaphore` acts as a gate: only a fixed number of tasks can pass through at a time.
The rest wait until a slot opens up.

```
@env.task
async def process_batch_with_semaphore(
    prompts: list[str],
    max_concurrent: int = 3,
) -> list[str]:
    """Process prompts in parallel, limiting concurrency with a semaphore.

    At most `max_concurrent` calls to the API run at any given time.
    The remaining tasks wait until a slot is available.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_call(prompt: str) -> str:
        async with semaphore:
            return await call_llm_api(prompt)

    results = await asyncio.gather(*[limited_call(p) for p in prompts])
    return list(results)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

The pattern is:

1. Create a semaphore with the desired limit.
2. Wrap each task call in an inner async function that acquires the semaphore before calling and releases it after.
3. Pass all wrapped calls to `asyncio.gather`.

All eight tasks are submitted immediately, but the Flyte orchestrator only allows three to run in parallel.
As each one completes, the next waiting task starts.

> [!NOTE]
> The semaphore controls how many tasks execute concurrently on the Flyte cluster.
> Each task still runs in its own container with its own resources — the semaphore simply limits how many containers are active at a time.

## Using flyte.map with concurrency

For uniform work — applying the same task to a list of inputs — `flyte.map` with the `concurrency` parameter is simpler:

```
@env.task
async def process_batch_with_map(prompts: list[str]) -> list[str]:
    """Process prompts using flyte.map with a built-in concurrency limit.

    This is the simplest approach when every item goes through the same task.
    """
    results = list(flyte.map(call_llm_api, prompts, concurrency=3))
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

This achieves the same concurrency limit with less boilerplate.

## Running the example

```
if __name__ == "__main__":
    flyte.init_from_config()
    prompts = [
        "Summarize this text",
        "Translate to French",
        "Extract key points",
        "Generate a title",
        "Write a conclusion",
        "List the main topics",
        "Identify the tone",
        "Suggest improvements",
    ]
    r = flyte.run(process_batch_with_semaphore, prompts)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

## When to use each approach

Use **`flyte.map(concurrency=N)`** when:

- Every item goes through the same task.
- You want the simplest possible code.

Use **`asyncio.Semaphore`** when:

- You need different concurrency limits for different task types within the same workflow.
- You want to combine concurrency control with error handling (e.g., `asyncio.gather(*tasks, return_exceptions=True)`).
- You are calling multiple different tasks in one parallel batch.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/human-in-the-loop ===

# Human-in-the-loop

Human-in-the-loop (HITL) workflows pause execution at a defined point, wait for a human to provide input or approval, and then continue based on that response. Common use cases include content moderation gates, model output review, anomaly confirmation, and manual approval steps before costly or irreversible operations.

The `flyteplugins-hitl` package provides an event-based API for this pattern. When an event is created, Flyte automatically serves a small FastAPI web app with a form where a human can submit input. The workflow then resumes with the submitted value.

```bash
pip install flyteplugins-hitl
```

Key characteristics:

- Supports `int`, `float`, `str`, and `bool` input types
- Crash-resilient: uses durable sleep so polling survives task restarts
- Configurable timeout and poll interval
- The web form is accessible from the task's report in the Flyte UI

## Setup

The task environment must declare `hitl.env` as a dependency. This makes the HITL web app available during task execution:

```
import flyte
import flyteplugins.hitl as hitl

# The task environment must declare hitl.env as a dependency.
# This makes the HITL web app available during task execution.
env = flyte.TaskEnvironment(
    name="hitl-workflow",
    image=flyte.Image.from_debian_base(name="hitl").with_pip_packages(
        "flyteplugins-hitl>=2.0.0",
        "fastapi",
        "uvicorn",
        "python-multipart",
    ),
    resources=flyte.Resources(cpu="1", memory="512Mi"),
    depends_on=[hitl.env],
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Automated task

An automated task runs first and produces a result that requires human review:

```
@env.task(report=True)
async def analyze_data(dataset: str) -> dict:
    """Automated task that produces a result requiring human review."""
    # Simulate analysis
    result = {
        "dataset": dataset,
        "row_count": 142857,
        "anomalies_detected": 3,
        "confidence": 0.87,
    }
    await flyte.report.replace.aio(
        f"Analysis complete: {result['anomalies_detected']} anomalies detected "
        f"(confidence: {result['confidence']:.0%})"
    )
    await flyte.report.flush.aio()
    return result
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Requesting human input

Use `hitl.new_event()` to pause and wait for a human response. The `prompt` is shown on the web form. The `data_type` controls what type the submitted value is converted to before being returned:

```
@env.task(report=True)
async def request_human_review(analysis: dict) -> bool:
    """Pause and ask a human whether to proceed with the flagged records."""
    event = await hitl.new_event.aio(
        "review_decision",
        data_type=bool,
        scope="run",
        prompt=(
            f"Analysis found {analysis['anomalies_detected']} anomalies "
            f"with {analysis['confidence']:.0%} confidence. "
            "Approve for downstream processing? (true/false)"
        ),
    )
    approved: bool = await event.wait.aio()
    return approved
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

When this task runs, Flyte:

1. Serves the HITL web app (if not already running)
2. Creates an event and writes a pending request to object storage
3. Displays a link to the web form in the task report
4. Polls for a response using durable sleep
5. Returns the submitted value once input is received

## Wiring it together

The main task orchestrates the automated step and the HITL gate:

```
@env.task(report=True)
async def main(dataset: str = "s3://my-bucket/data.parquet") -> str:
    analysis = await analyze_data(dataset=dataset)

    approved = await request_human_review(analysis=analysis)

    if approved:
        return "Processing approved — continuing pipeline."
    else:
        return "Processing rejected by reviewer — pipeline halted."

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
    print(r.outputs())
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Event options

`hitl.new_event()` accepts the following parameters:

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | `str` | — | Descriptive name shown in logs and the UI |
| `data_type` | `type` | — | Expected input type: `int`, `float`, `str`, or `bool` |
| `scope` | `str` | `"run"` | Scope of the event. Currently only `"run"` is supported |
| `prompt` | `str` | `"Please provide a value"` | Message shown on the web form |
| `timeout_seconds` | `int` | `3600` | Maximum time to wait before raising `TimeoutError` |
| `poll_interval_seconds` | `int` | `5` | How often to check for a response |

## Submitting input programmatically

In addition to the web form, input can be submitted via the event's JSON API endpoint. This is useful for automated testing or integration with external approval systems:

```bash
curl -X POST https://<hitl-app-endpoint>/submit/json \
  -H "Content-Type: application/json" \
  -d '{
    "request_id": "<request_id>",
    "response_path": "<response_path>",
    "value": "true",
    "data_type": "bool"
  }'
```

The `request_id` and `response_path` are shown in the task report alongside the form URL.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/other-features ===

This section covers advanced programming patterns and techniques for working with Flyte tasks.

## Task Forwarding

When one task calls another task using the normal invocation syntax (e.g., `await inner_task(x)`), Flyte creates a durable action that's recorded in the UI with data passed through the metadata store. However, if you want to execute a task in the same Python VM without this overhead, use the `.forward()` method.

**When to use**: You want to avoid durability overhead and execute task logic directly in the current VM.

```python
import flyte

env = flyte.TaskEnvironment("my-env")

@env.task
async def inner_task(x: int) -> int:
    return x + 1

@env.task
async def outer_task(x: int) -> int:
    # Executes in same VM, no durable action created
    v = await inner_task.forward(x=10)

    # Creates a durable action, recorded in UI
    return await inner_task(v)
```

The `.forward()` method works with both sync and async tasks:

```python
@env.task
def sync_inner_task(x: int) -> int:
    return x + 1

@env.task
def sync_outer_task(x: int) -> int:
    # Direct execution, no remote call
    v = sync_inner_task.forward(x=10)
    return sync_inner_task(v)
```

## Passing Tasks and Functions as Arguments

You can pass both Flyte tasks and regular Python functions as arguments to other tasks. Flyte handles this through pickling, so the code appears as pickled data in the UI.

```python
import typing
import flyte

env = flyte.TaskEnvironment("udfs")

@env.task
async def add_one_udf(x: int) -> int:
    return x + 1

# Regular async function (not a task)
async def fn_add_two_udf(x: int) -> int:
    return x + 2

@env.task
async def run_udf(x: int, udf: typing.Callable[[int], typing.Awaitable[int]]) -> int:
    return await udf(x)

@env.task
async def main() -> list[int]:
    # Pass a Flyte task as an argument
    result_one = await run_udf(5, add_one_udf)

    # Pass a regular function as an argument
    result_two = await run_udf(5, fn_add_two_udf)

    return [result_one, result_two]
```

**Note**: Both tasks and regular functions are serialized via pickling when passed as arguments.

## Custom Action Names

By default, actions in the UI use the task's function name. You can provide custom, user-friendly names using the `short_name` parameter.

### Set at Task Definition

```python
import flyte

env = flyte.TaskEnvironment("friendly_names")

@env.task(short_name="my_task")
async def some_task() -> str:
    return "Hello, Flyte!"
```

### Override at Call Time

```python
@env.task(short_name="entrypoint")
async def main() -> str:
    # Uses the default short_name "my_task"
    s = await some_task()

    # Overrides to use "my_name" for this specific action
    return s + await some_task.override(short_name="my_name")()
```

This is useful when the same task is called multiple times with different contexts, making the UI more readable.

## Invoking Async Functions from Sync Tasks

When migrating from Flyte 1.x to 2.0, you may have legacy sync code that needs to call async functions. Use `nest_asyncio.apply()` to enable `asyncio.run()` within sync tasks.

```python
import asyncio
import nest_asyncio
import flyte

env = flyte.TaskEnvironment(
    "async_in_sync",
    image=flyte.Image.from_debian_base().with_pip_packages("nest_asyncio"),
)

# Apply at module level
nest_asyncio.apply()

async def async_function() -> str:
    await asyncio.sleep(1)
    return "done"

@env.task
def sync_task() -> str:
    # Now you can use asyncio.run() in a sync task
    return asyncio.run(async_function())
```

**Important**:
- Call `nest_asyncio.apply()` at the module level before defining tasks
- Add `nest_asyncio` to your image dependencies
- This is particularly useful during migration when you have mixed sync/async code

## Async and Sync Task Interoperability

When migrating from older sync-based code to async tasks, or when working with mixed codebases, you need to call sync tasks from async parent tasks. Flyte provides the `.aio` method on every task (even sync ones) to enable this.

### Calling Sync Tasks from Async Tasks

Every sync task automatically has an `.aio` property that returns an async-compatible version:

```python
import flyte

env = flyte.TaskEnvironment("mixed-tasks")

@env.task
def sync_task(x: int) -> str:
    """Legacy sync task"""
    return f"Processed {x}"

@env.task
async def async_task(x: int) -> str:
    """New async task that calls legacy sync task"""
    # Use .aio to call sync task from async context
    result = await sync_task.aio(x)
    return result
```

### Using with `flyte.map.aio()`

When you need to call sync tasks in parallel from an async context, use `flyte.map.aio()`:

```python
from typing import List
import flyte

env = flyte.TaskEnvironment("map-example")

@env.task
def sync_process(x: int) -> str:
    """Synchronous processing task"""
    return f"Task {x}"

@env.task
async def async_main(n: int) -> List[str]:
    """Async task that maps over sync task"""
    results = []

    # Map over sync task from async context
    async for result in flyte.map.aio(sync_process, range(n)):
        if isinstance(result, Exception):
            raise result
        results.append(result)

    return results
```

**Why this matters**: This pattern is powerful when migrating from Flyte 1.x or integrating legacy sync tasks with new async code. You don't need to rewrite all sync tasks at once—they can be called seamlessly from async contexts.

## Using AnyIO in Async Tasks

Flyte async tasks support `anyio` for structured concurrency as an alternative to `asyncio.gather()`.

```python
import anyio
import aioresult
import flyte

env = flyte.TaskEnvironment(
    "anyio_example",
    image=flyte.Image.from_debian_base().with_pip_packages("anyio", "aioresult"),
)

@env.task
async def process_item(x: int) -> int:
    return x * 2

@env.task
async def batch_process(items: list[int]) -> list[int]:
    captured_results = []

    async with anyio.create_task_group() as tg:
        # Start multiple tasks concurrently
        for item in items:
            captured_results.append(
                aioresult.ResultCapture.start_soon(tg, process_item, item)
            )

    # Extract results
    return [r.result() for r in captured_results]
```

**Note**: You can use anyio's task groups, timeouts, and other structured concurrency primitives within Flyte async tasks.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-programming/unit-testing ===

Unit testing is essential for ensuring your Flyte tasks work correctly. Flyte 2.0 provides flexible testing approaches that allow you to test both your business logic and Flyte-specific features like type transformations and caching.

## Understanding Task Invocation

When working with functions decorated with `@env.task`, there are two ways to invoke them, each with different behavior:

### Direct Function Invocation

When you call a task directly like a regular Python function:

```python
result = my_task(x=10, y=20)
```

**Flyte features are NOT invoked**, including:
- Type transformations and serialization
- Caching
- Data validation

This behaves exactly like calling a regular Python function, making it ideal for testing your business logic.

### Using `flyte.run()`

When you invoke a task using `flyte.run()`:

```python
run = flyte.run(my_task, x=10, y=20)
result = run.outputs()
```

**Flyte features ARE invoked**, including:
- Type transformations and serialization
- Data validation
- Type checking (raises `flyte.errors` if types are not supported or restricted)

This allows you to test Flyte-specific behavior like serialization and caching.

## Testing Business Logic

For most unit tests, you want to verify your business logic works correctly. Use **direct function invocation** for this:

```python
import flyte

env = flyte.TaskEnvironment("my_env")

@env.task
def add(a: int, b: int) -> int:
    return a + b

def test_add():
    result = add(a=3, b=5)
    assert result == 8
```

### Testing Async Tasks

Async tasks work the same way with direct invocation:

```python
import pytest

@env.task
async def subtract(a: int, b: int) -> int:
    return a - b

@pytest.mark.asyncio
async def test_subtract():
    result = await subtract(a=10, b=4)
    assert result == 6
```

### Testing Nested Tasks

When tasks call other tasks, direct invocation continues to work without any Flyte overhead:

```python
@env.task
def nested(a: int, b: int) -> int:
    return add(a, b)  # Calls the add task directly

def test_nested():
    result = nested(3, 5)
    assert result == 8
```

## Testing Type Transformations and Serialization

When you need to test how Flyte handles data types, serialization, or caching, use `flyte.run()`:

```python
@pytest.mark.asyncio
async def test_add_with_flyte_run():
    run = flyte.run(add, 3, 5)
    assert run.outputs() == 8
```

### Testing Type Restrictions

Some types may not be supported or may be restricted. Use `flyte.run()` to test that these restrictions are enforced:

```python
from typing import Tuple
import flyte.errors

@env.task
def not_supported_types(x: Tuple[str, str]) -> str:
    return x[0]

@pytest.mark.asyncio
async def test_not_supported_types():
    # Direct invocation works fine
    result = not_supported_types(x=("a", "b"))
    assert result == "a"

    # flyte.run enforces type restrictions
    with pytest.raises(flyte.errors.RestrictedTypeError):
        flyte.run(not_supported_types, x=("a", "b"))
```

### Testing Nested Tasks with Serialization

You can also test nested task execution with Flyte's full machinery:

```python
@pytest.mark.asyncio
async def test_nested_with_run():
    run = flyte.run(nested, 3, 5)
    assert run.outputs() == 8
```

## Testing Traced Functions

Functions decorated with `@flyte.trace` can be tested similarly to tasks:

```python
@flyte.trace
async def traced_multiply(a: int, b: int) -> int:
    return a * b

@pytest.mark.asyncio
async def test_traced_multiply():
    result = await traced_multiply(a=6, b=7)
    assert result == 42
```

## Best Practices

1. **Test logic with direct invocation**: For most unit tests, call tasks directly to test your business logic without Flyte overhead.

2. **Test serialization with `flyte.run()`**: Use `flyte.run()` when you need to verify:
   - Type transformations work correctly
   - Data serialization/deserialization
   - Caching behavior
   - Type restrictions are enforced

3. **Use standard testing frameworks**: Flyte tasks work with pytest, unittest, and other Python testing frameworks.

4. **Test async tasks properly**: Use `@pytest.mark.asyncio` for async tasks and await their results.

5. **Mock external dependencies**: Use standard Python mocking techniques for external services, databases, etc.

## Quick Reference

| Test Scenario | Method | Example |
|--------------|--------|---------|
| Business logic (sync) | Direct call | `result = task(x=10)` |
| Business logic (async) | Direct await | `result = await task(x=10)` |
| Type transformations | `flyte.run()` | `r = flyte.run(task, x=10)` |
| Data serialization | `flyte.run()` | `r = flyte.run(task, x=10)` |
| Caching behavior | `flyte.run()` | `r = flyte.run(task, x=10)` |
| Type restrictions | `flyte.run()` + pytest.raises | `pytest.raises(flyte.errors.RestrictedTypeError)` |

## Example Test Suite

Here's a complete example showing different testing approaches:

```python
import pytest
import flyte
import flyte.errors

env = flyte.TaskEnvironment("test_env")

@env.task
def add(a: int, b: int) -> int:
    return a + b

@env.task
async def subtract(a: int, b: int) -> int:
    return a - b

# Test business logic directly
def test_add_logic():
    result = add(a=3, b=5)
    assert result == 8

@pytest.mark.asyncio
async def test_subtract_logic():
    result = await subtract(a=10, b=4)
    assert result == 6

# Test with Flyte serialization
@pytest.mark.asyncio
async def test_add_serialization():
    run = flyte.run(add, 3, 5)
    assert run.outputs() == 8

@pytest.mark.asyncio
async def test_subtract_serialization():
    run = flyte.run(subtract, a=10, b=4)
    assert run.outputs() == 6
```

## Future Improvements

The Flyte SDK team is actively working on improvements for advanced unit testing scenarios, particularly around initialization and setup for complex test cases. Additional utilities and patterns may be introduced in future releases to make unit testing even more streamlined.

