# Scale your runs
> This bundle contains all pages in the Scale your runs section.
> Source: https://www.union.ai/docs/v2/union/user-guide/run-scaling/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/run-scaling ===

# Scale your runs

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This guide helps you understand and optimize the performance of your Flyte workflows. Whether you're building latency-sensitive applications or high-throughput data pipelines, these docs will help you make the right architectural choices.

## Understanding Flyte execution

Before optimizing performance, it's important to understand how Flyte executes your workflows:

- ****Scale your runs > Data flow****: Learn how data moves between tasks, including inline vs. reference data types, caching mechanisms, and storage configuration.
- ****Scale your runs > Life of a run****: Understand what happens when you invoke `flyte.run()`, from code analysis and image building to task execution and state management.

## Performance optimization

Once you understand the fundamentals, dive into performance tuning:

- ****Scale your runs > Scale your workflows****: A comprehensive guide to optimizing workflow performance, covering latency vs. throughput, task overhead analysis, batching strategies, reusable containers, and more.

## Key concepts for scaling

When scaling your workflows, keep these principles in mind:

1. **Task overhead matters**: The overhead of creating a task (uploading data, enqueuing, creating containers) should be much smaller than the task runtime.
2. **Batch for throughput**: For large-scale data processing, batch multiple items into single tasks to reduce overhead.
3. **Reusable containers**: Eliminate container startup overhead and enable concurrent execution with reusable containers.
4. **Traces for lightweight ops**: Use traces instead of tasks for lightweight operations that need checkpointing.
5. **Limit fanout**: Keep the total number of actions per run below 50k (target 10k-20k for best performance).
6. **Choose the right data types**: Use reference types (files, directories, DataFrames) for large data and inline types for small data.

For detailed guidance on each of these topics, see **Scale your runs > Scale your workflows**.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/run-scaling/data-flow ===

# Data flow

Understanding how data flows between tasks is critical for optimizing workflow performance in Flyte. Tasks take inputs and produce outputs, with data flowing seamlessly through your workflow using an efficient transport layer.

## Overview

Flyte tasks are run to completion. Each task takes inputs and produces exactly one output. Even if multiple instances run concurrently (such as in retries), only one output will be accepted. This deterministic data flow model provides several key benefits:

1. **Reduced boilerplate**: Automatic handling of files, DataFrames, directories, custom types, data classes, Pydantic models, and primitive types without manual serialization.
2. **Type safety**: Optional type annotations enable deeper type understanding, automatic UI form generation, and runtime type validation.
3. **Efficient transport**: Data is passed by reference (files, directories, DataFrames) or by value (primitives) based on type.
4. **Durable storage**: All data is stored durably and accessible through APIs and the UI.
5. **Caching support**: Efficient caching using shallow immutable references for referenced data.

## Data types and transport

Flyte handles different data types with different transport mechanisms:

### Passed by reference

These types are not copied but passed as references to storage locations:

- **Files**: `flyte.io.File`
- **Directories**: `flyte.io.Dir`
- **Dataframes**: `flyte.io.DataFrame`, `pd.DataFrame`, `pl.DataFrame`, etc.

Dataframes are automatically converted to Parquet format and read using Apache Arrow for zero-copy reads. Use `flyte.io.DataFrame` for lazy materialization to any supported type like pandas or polars. [Learn more about the Flyte Dataframe type](https://www.union.ai/docs/v2/union/user-guide/task-programming/dataframes)

### Passed by value (inline I/O)

Primitive and structured types are serialized and passed inline:

| Type Category | Examples | Serialization |
|--------------|----------|---------------|
| **Primitives** | `int`, `float`, `str`, `bool`, `None` | MessagePack |
| **Time types** | `datetime.datetime`, `datetime.date`, `datetime.timedelta` | MessagePack |
| **Collections** | `list`, `dict`, `tuple` | MessagePack |
| **Data structures** | data classes, Pydantic `BaseModel` | MessagePack |
| **Enums** | `enum.Enum` subclasses | MessagePack |
| **Unions** | `Union[T1, T2]`, `Optional[T]` | MessagePack |
| **Protobuf** | `google.protobuf.Message` | Binary |

Flyte uses efficient MessagePack serialization for most types, providing compact binary representation with strong type safety.

> [!NOTE]
> If type annotations are not used, or if `typing.Any` or unrecognized types are used, data will be pickled. By default, pickled objects smaller than 10KB are passed inline, while larger pickled objects are automatically passed as a file. Pickling allows for progressive typing but should be used carefully.

## Task execution and data flow

### Input download

When a task starts:

1. **Inline inputs download**: The task downloads inline inputs from the configured Flyte object store.
2. **Size limits**: By default, inline inputs are limited to 10MB, but this can be adjusted using `flyte.TaskEnvironment`'s `max_inline_io` parameter.
3. **Memory consideration**: Inline data is materialized in memory, so adjust your task resources accordingly.
4. **Reference materialization**: Reference data (files, directories) is passed using special types in `flyte.io`. Dataframes are automatically materialized if using `pd.DataFrame`. Use `flyte.io.DataFrame` to avoid automatic materialization.

### Output upload

When a task returns data:

1. **Inline data**: Uploaded to the Flyte object store configured at the organization, project, or domain level.
2. **Reference data**: Stored in the same metadata store by default, or configured using `flyte.with_runcontext(raw_data_storage=...)`.
3. **Separate prefixes**: Each task creates one output per retry attempt in separate prefixes, making data incorruptible by design.

## Task-to-task data flow

When a task invokes downstream tasks:

1. **Input recording**: The input to the downstream task is recorded to the object store.
2. **Reference upload**: All referenced objects are uploaded (if not already present).
3. **Task invocation**: The downstream task is invoked on the remote server.
4. **Parallel execution**: When multiple tasks are invoked in parallel using `flyte.map` or `asyncio`, inputs are written in parallel.
5. **Storage layer**: Data writing uses the `flyte.storage` layer, backed by the Rust-based `object-store` crate and optionally `fsspec` plugins.
6. **Output download**: Once the downstream task completes, inline outputs are downloaded and returned to the calling task.

## Caching and data hashing

Understanding how Flyte caches data is essential for performance optimization.

### Cache key computation

A cache hit occurs when the following components match:

- **Task name**: The fully-qualified task name
- **Computed input hash**: Hash of all inputs (excluding `ignored_inputs`)
- **Task interface hash**: Hash of input and output types
- **Task config hash**: Hash of task configuration
- **Cache version**: User-specified or automatically computed

### Inline data caching

All inline data is cached using a consistent hashing system. The cache key is derived from the data content.

### Reference data hashing

Reference data (files, directories) is hashed shallowly by default using the hash of the storage location. You can customize hashing:

- Use `flyte.io.File.new_remote()` or `flyte.io.File.from_existing_remote()` with custom hash functions or values.
- Provide explicit hash values for deep content hashing if needed.

### Cache control

Control caching behavior using `flyte.with_runcontext`:

- **Scope**: Set `cache_lookup_scope` to `"global"` or `"project/domain"`.
- **Disable cache**: Set `overwrite_cache=True` to force re-execution.

For more details on caching configuration, see [Caching](https://www.union.ai/docs/v2/union/user-guide/task-configuration/caching).

## Traces and data flow

When using [traces](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces), the data flow behavior is different:

1. **Full execution first**: The trace is fully executed before inputs and outputs are recorded.
2. **Checkpoint behavior**: Recording happens like a checkpoint at the end of trace execution.
3. **Streaming iterators**: The entire output is buffered and recorded after the stream completes. Buffering is pass-through, allowing caller functions to consume output while buffering.
4. **Chained traces**: All traces are recorded after the last one completes consumption.
5. **Same process with `asyncio`**: Traces run within the same Python process and support `asyncio` parallelism, so failures can be retried, effectively re-running the trace.
6. **Lightweight overhead**: Traces only have the overhead of data storage (no task orchestration overhead).

> [!NOTE]
> Traces are not a substitute for tasks if you need caching. Tasks provide full caching capabilities, while traces provide lightweight checkpointing with storage overhead. However, traces support concurrent execution using `asyncio` patterns within a single task.

## Object stores and latency considerations

By default, Flyte uses object stores like S3, GCS, Azure Storage, and R2 as metadata stores. These have high latency for smaller objects, so:

- **Minimum task duration**: Tasks should take at least a second to run to amortize storage overhead.
- **Future improvements**: High-performance metastores like Redis and PostgreSQL may be supported in the future. Contact the Union team if you're interested.

## Configuring data storage

### Organization and project level

Object stores are configured at the organization level or per project/domain. Documentation for this configuration is coming soon.

### Per-run configuration

Configure raw data storage on a per-run basis using `flyte.with_runcontext`:

```python
run = flyte.with_runcontext(
    raw_data_storage="s3://my-bucket/custom-path"
).run(my_task, input_data=data)
```

This allows you to control where reference data (files, directories, DataFrames) is stored for specific runs.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/run-scaling/life-of-a-run ===

# Life of a run

Understanding what happens when you invoke `flyte.run()` is crucial for optimizing workflow performance and debugging issues. This guide walks through each phase of task execution from submission to completion.

## Overview

When you execute `flyte.run()`, the system goes through several phases:

1. **Code analysis and preparation**: Discover environments and images
2. **Image building**: Build container images if changes are detected
3. **Code bundling**: Package your Python code
4. **Upload**: Transfer the code bundle to object storage
5. **Run creation**: Submit the run to the backend
6. **Task execution**: Execute the task in the data plane
7. **State management**: Track and persist execution state

## Phase 1: Code analysis and preparation

When `flyte.run()` is invoked:

1. **Environment discovery**: Flyte analyzes your code and finds all relevant `flyte.TaskEnvironment` instances by walking the `depends_on` hierarchy.
2. **Image identification**: Discovers unique `flyte.Image` instances used across all environments.
3. **Image building**: Starts the image building process. Images are only built if a change is detected.

> [!NOTE]
> If you invoke `flyte.run()` multiple times within the same Python process without changing code (such as in a notebook or script), the code bundling and image building steps are done only once. This can dramatically speed up iteration.

## Phase 2: Image building

Container images provide the runtime environment for your tasks:

- **Change detection**: Images are only rebuilt if changes are detected in dependencies or configuration.
- **Caching**: Previously built images are reused when possible.
- **Parallel builds**: Multiple images can be built concurrently.

For more details on container images, see [Container Images](https://www.union.ai/docs/v2/union/user-guide/task-configuration/container-images).

## Phase 3: Code bundling

After images are built, your project files are bundled:

### Default: `copy_style="loaded_modules"`

By default, all Python modules referenced by the invoked tasks through module-level import statements are automatically copied. This provides a good balance between completeness and efficiency.

### Alternative: `copy_style="none"`

Skip bundling by setting `copy_style="none"` in `flyte.with_runcontext()` and adding all code into `flyte.Image`:

```python
# Add code to image
image = flyte.Image().with_source_code("/path/to/code")

# Or use Dockerfile
image = flyte.Image.from_dockerfile("Dockerfile")

# Skip bundling
run = flyte.with_runcontext(copy_style="none").run(my_task, input_data=data)
```

For more details on code packaging, see [Packaging](https://www.union.ai/docs/v2/union/user-guide/task-deployment/packaging).

## Phase 4: Upload code bundle

Once the code bundle is created:

1. **Request signed URL**: The SDK sends the bundle checksum and target path to the Control Plane.
2. **Control Plane obtains URL**: The Control Plane calls the Data Plane to obtain a signed URL for that checksum and path.
3. **Direct upload**: The signed URL is returned to the SDK, which uploads the code bundle directly to the object store.

## Phase 5: Run creation and queuing

The `CreateRun` API is invoked:

1. **Copy inputs**: Input data is copied to the object store.
2. **En-queue a run**: The run is queued into the Union Control Plane.
3. **Hand off to executor**: Union Control Plane hands the task to the Executor Service in your data plane.
4. **Create action**: The parent task action (called `a0`) is created.

## Phase 6: Task execution in data plane

### Container startup

1. **Container starts**: The task container starts in your data plane.
2. **Download code bundle**: The Flyte runtime downloads the code bundle from object storage.
3. **Inflate task**: The task is inflated from the code bundle.
4. **Download inputs**: Inline inputs are downloaded from the object store.
5. **Execute task**: The task is executed with context and inputs.

### Invoking downstream tasks

If the task invokes other tasks:

1. **Controller thread**: A controller thread starts to communicate with the backend Queue Service.
2. **Monitor status**: The controller monitors the status of downstream actions.
3. **Crash recovery**: If the task crashes, the action identifier is deterministic, allowing the task to resurrect its state from Union Control Plane.
4. **Replay**: The controller efficiently replays state (even at large scale) to find missing completions and resume monitoring.

### Execution flow diagram

```mermaid
sequenceDiagram
    participant Client as SDK/Client
    participant Control as Control Plane<br/>(Queue Service)
    participant Data as Data Plane<br/>(Executor)
    participant ObjStore as Object Store
    participant Container as Task Container

    Client->>Client: Analyze code & discover environments
    Client->>Client: Build images (if changed)
    Client->>Client: Bundle code
    Client->>Control: Request signed URL (checksum, path)
    Control->>Data: Get signed URL for bundle
    Data-->>Control: Signed URL
    Control-->>Client: Signed URL
    Client->>ObjStore: Upload code bundle (signed URL)
    Client->>Control: CreateRun API with inputs
    Control->>Data: Copy inputs
    Data->>ObjStore: Write inputs
    Control->>Data: Queue task (create action a0)
    Data->>Container: Start container
    Container->>Data: Request code bundle
    Data->>ObjStore: Read code bundle
    ObjStore-->>Data: Code bundle
    Data-->>Container: Code bundle
    Container->>Container: Inflate task
    Container->>Data: Request inputs
    Data->>ObjStore: Read inputs
    ObjStore-->>Data: Inputs
    Data-->>Container: Inputs
    Container->>Container: Execute task

    alt Invokes downstream tasks
        Container->>Container: Start controller thread
        Container->>Control: Submit downstream tasks
        Control->>Data: Queue downstream actions
        Container->>Control: Monitor downstream status
        Control-->>Container: Status updates
    end

    Container->>Data: Upload outputs
    Data->>ObjStore: Write outputs
    Container->>Control: Complete
    Control-->>Client: Run complete
```

## Action identifiers and crash recovery

Flyte uses deterministic action identifiers to enable robust crash recovery:

- **Consistent identifiers**: Action identifiers are consistently computed based on task and invocation context.
- **Re-run identical**: In any re-run, the action identifier is identical for the same invocation.
- **Multiple invocations**: Multiple invocations of the same task receive unique identifiers.
- **Efficient resurrection**: On crash, the `a0` action resurrects its state from Union Control Plane efficiently, even at large scale.
- **Replay and resume**: The controller replays execution until it finds missing completions and starts watching them.

## Downstream task execution

When downstream tasks are invoked:

1. **Action creation**: Downstream actions are created with unique identifiers.
2. **Queue assignment**: Actions are handed to an executor, which can be selected using a queue or from the general pool.
3. **Parallel execution**: Multiple downstream tasks can execute in parallel.
4. **Result aggregation**: Results are aggregated and returned to the parent task.

## Reusable containers

When using [reusable containers](https://www.union.ai/docs/v2/union/user-guide/task-configuration/reusable-containers), the execution model changes:

1. **Environment spin-up**: The container environment is first spun up with configured replicas.
2. **Task allocation**: Tasks are allocated to available replicas in the environment.
3. **Scaling**: If all replicas are busy, new replicas are spun up (up to the configured maximum), or tasks are backlogged in queues.
4. **Container reuse**: The same container handles multiple task executions, reducing startup overhead.
5. **Lifecycle management**: Containers are managed according to `ReusePolicy` settings (`idle_ttl`, `scaledown_ttl`, etc.).

### Reusable container execution flow

```mermaid
sequenceDiagram
    participant Control as Queue Service
    participant Executor as Executor Service
    participant Pool as Container Pool
    participant Replica as Container Replica

    Control->>Executor: Submit task

    alt Reusable containers enabled
        Executor->>Pool: Request available replica

        alt Replica available
            Pool->>Replica: Allocate task
            Replica->>Replica: Execute task
            Replica->>Pool: Task complete (ready for next)
        else No replica available
            alt Can scale up
                Executor->>Pool: Create new replica
                Pool->>Replica: Spin up new container
                Replica->>Replica: Execute task
                Replica->>Pool: Task complete
            else At max replicas
                Executor->>Pool: Queue task
                Pool-->>Executor: Wait for available replica
                Pool->>Replica: Allocate when available
                Replica->>Replica: Execute task
                Replica->>Pool: Task complete
            end
        end
    else No reusable containers
        Executor->>Replica: Create new container
        Replica->>Replica: Execute task
        Replica->>Executor: Complete & terminate
    end

    Replica-->>Control: Return results
```

## State replication and visualization

### Queue Service to Run Service

1. **Reliable replication**: Queue Service reliably replicates execution state back to Run Service.
2. **Eventual consistency**: The Run Service may be slightly behind the actual execution state.
3. **Visualization**: Run Service paints the entire run onto the UI.

### UI limitations

- **Current limit**: The UI is currently limited to displaying 50k actions per run.
- **Future improvements**: This limit will be increased in future releases. Contact the Union team if you need higher limits.

## Optimization opportunities

Understanding the life of a run reveals several optimization opportunities:

1. **Reuse Python process**: Run `flyte.run()` multiple times in the same process to avoid re-bundling code.
2. **Skip bundling**: Use `copy_style="none"` and bake code into images for faster startup.
3. **Reusable containers**: Use reusable containers to eliminate container startup overhead.
4. **Parallel execution**: Invoke multiple downstream tasks concurrently using `flyte.map()` or `asyncio`.
5. **Efficient data flow**: Minimize data transfer by using reference types (files, directories) instead of inline data.
6. **Caching**: Enable task caching to avoid redundant computation.

For detailed performance tuning guidance, see [Scale your workflows](./scale-your-workflows).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/run-scaling/scale-your-workflows ===

# Scale your workflows

Performance optimization in Flyte involves understanding the interplay between task execution overhead, data transfer, and concurrency. This guide helps you identify bottlenecks and choose the right patterns for your workload.

## Understanding performance dimensions

Performance optimization focuses on two key dimensions:

### Latency

**Goal**: Minimize end-to-end execution time for individual workflows.

**Characteristics**:
- Fast individual actions (milliseconds to seconds)
- Total action count typically less than 1,000
- Critical for interactive applications and real-time processing
- Multi-step inference, with reusing model or data in memory (use reusable containers with [@alru.cache](https://pypi.org/project/async-lru/))

**Recommended approach**:
- Use tasks for orchestration and parallelism
- Use [traces](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces) for fine-grained checkpointing
- Model parallelism using `asyncio` and use things methods like `asyncio.as_completed` or `asyncio.gather` to join the parallelism
- Leverage [reusable containers](https://www.union.ai/docs/v2/union/user-guide/task-configuration/reusable-containers) with concurrency to eliminate startup overhead and optimize resource utilization

### Throughput

**Goal**: Maximize the number of items processed per unit time.

**Characteristics**:
- Processing large datasets (millions of items)
- High total action count (10k to 50k actions)
- Batch processing, large-scale batch inference and ETL workflows

**Recommended approach**:
- Batch workloads to reduce overhead
- Limit fanout to manage system load
- Use reusable containers with concurrency for maximum utilization
- Balance task granularity with overhead

## Task execution overhead

Understanding task overhead is critical for performance optimization. When you invoke a task, several operations occur:

| Operation | Symbol | Description |
|-----------|--------|-------------|
| **Upload data** | `u` | Time to upload input data to object store |
| **Download data** | `d` | Time to download input data from object store |
| **Enqueue task** | `e` | Time to enqueue task in Queue Service |
| **Create instance** | `t` | Time to create task container instance |

**Total overhead per task**: `2u + 2d + e + t`

This overhead includes:
- Uploading inputs from the parent task (`u`)
- Downloading inputs in the child task (`d`)
- Uploading outputs from the child task (`u`)
- Downloading outputs in the parent task (`d`)
- Enqueuing the task (`e`)
- Creating the container instance (`t`)

### The overhead principle

For efficient execution, task overhead should be much smaller than task runtime:

```
Total overhead (2u + 2d + e + t) << Task runtime
```

If task runtime is comparable to or less than overhead, consider:
1. **Batching**: Combine multiple work items into a single task
2. **Traces**: Use traces instead of tasks for lightweight operations
3. **Reusable containers**: Eliminate container creation overhead (`t`)
4. **Local execution**: Run lightweight operations within the parent task

## System architecture and data flow

To optimize performance, understand how tasks flow through the system:

1. **Control plane to data plane**: Tasks flow from the control plane (Run Service, Queue Service) to the data plane (Executor Service).
2. **Data movement**: Data moves between tasks through object storage. See [Data flow](./data-flow) for details.
3. **State replication**: Queue Service reliably replicates state back to Run Service for visualization. The Run Service may be slightly behind actual execution.

For a detailed walkthrough of task execution, see [Life of a run](./life-of-a-run).

## Optimization strategies

### 1. Use reusable containers for concurrency

[Reusable containers](https://www.union.ai/docs/v2/union/user-guide/task-configuration/reusable-containers) eliminate the container creation overhead (`t`) and enable concurrent task execution:

```python
import flyte
from datetime import timedelta

# Define reusable environment
env = flyte.TaskEnvironment(
    name="high-throughput",
    reuse_policy=flyte.ReusePolicy(
        replicas=(2, 10),           # Auto-scale from 2 to 10 replicas
        concurrency=5,              # 5 tasks per replica = 50 max concurrent
        scaledown_ttl=timedelta(minutes=10),
        idle_ttl=timedelta(hours=1)
    )
)

@env.task
async def process_item(item: dict) -> dict:
    # Process individual item
    return {"processed": item["id"]}
```

**Benefits**:
- Eliminates container startup overhead (`t ≈ 0`)
- Supports concurrent execution (multiple tasks per container)
- Auto-scales based on demand
- Reuses Python environment and loaded dependencies

**Limitations**:
- Concurrency is limited by CPU and I/O resources in the container
- Memory requirements scale with total working set size
- Best for I/O-bound tasks or async operations

### 2. Batch workloads to reduce overhead

For high-throughput processing, batch multiple items into a single task:

```python
@env.task
async def process_batch(items: list[dict]) -> list[dict]:
    """Process a batch of items in a single task."""
    results = []
    for item in items:
        result = await process_single_item(item)
        results.append(result)
    return results

@env.task
async def process_large_dataset(dataset: list[dict]) -> list[dict]:
    """Process 1M items with batching."""
    batch_size = 1000  # Adjust based on overhead calculation
    batches = [dataset[i:i + batch_size] for i in range(0, len(dataset), batch_size)]

    # Process batches in parallel (1000 tasks instead of 1M)
    results = await asyncio.gather(*[process_batch(batch) for batch in batches])

    # Flatten results
    return [item for batch_result in results for item in batch_result]
```

**Benefits**:
- Reduces total number of tasks (e.g., 1000 tasks instead of 1M)
- Amortizes overhead across multiple items
- Lower load on Queue Service and object storage

**Choosing batch size**:
1. Calculate overhead: `overhead = 2u + 2d + e + t`
2. Target task runtime: `runtime > 10 × overhead` (rule of thumb)
3. Adjust batch size to achieve target runtime
4. Consider memory constraints (larger batches require more memory)

### 3. Use traces for lightweight operations

[Traces](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces) provide fine-grained checkpointing with minimal overhead:

```python
@flyte.trace
async def fetch_data(url: str) -> dict:
    """Traced function for API call."""
    response = await http_client.get(url)
    return response.json()

@flyte.trace
async def transform_data(data: dict) -> dict:
    """Traced function for transformation."""
    return {"transformed": data}

@env.task
async def process_workflow(urls: list[str]) -> list[dict]:
    """Orchestrate using traces instead of tasks."""
    results = []
    for url in urls:
        data = await fetch_data(url)
        transformed = await transform_data(data)
        results.append(transformed)
    return results
```

**Benefits**:
- Only storage overhead (no task orchestration overhead)
- Runs in the same Python process with asyncio parallelism
- Provides checkpointing and resumption
- Visible in execution logs and UI

**Trade-offs**:
- No caching (use tasks for cacheable operations)
- Shares resources with the parent task (CPU, memory)
- Storage writes may still be slow due to object store latency

**When to use traces**:
- API calls and external service interactions
- Deterministic transformations that need checkpointing
- Operations taking more than 1 second (to amortize storage overhead)

### 4. Limit fanout for system stability

The UI and system have limits on the number of actions per run:

- **Current limit**: 50k actions per run
- **Future**: Higher limits will be supported (contact the Union team if needed)

**Example: Control fanout with batching**

```python
@env.task
async def process_million_items(items: list[dict]) -> list[dict]:
    """Process 1M items with controlled fanout."""
    # Target 10k tasks, each processing 100 items
    batch_size = 100
    max_fanout = 10000

    batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

    # Use flyte.map for parallel execution
    results = await flyte.map(process_batch, batches)

    return [item for batch in results for item in batch]
```

### 5. Optimize data transfer

Minimize data transfer overhead by choosing appropriate data types:

**Use reference types for large data**:
```python
from flyte.io import File, Directory, DataFrame

@env.task
async def process_large_file(input_file: File) -> File:
    """Files passed by reference, not copied."""
    # Download only when needed
    local_path = input_file.download()

    # Process file
    result_path = process(local_path)

    # Upload result
    return File.new_remote(result_path)
```

**Use inline types for small data**:
```python
@env.task
async def process_metadata(metadata: dict) -> dict:
    """Small dicts passed inline efficiently."""
    return {"processed": metadata}
```

**Guideline**:
- **< 10 MB**: Use inline types (primitives, small dicts, lists)
- **> 10 MB**: Use reference types (File, Directory, DataFrame)
- **Adjust**: Use `max_inline_io` in `TaskEnvironment` to change the threshold

See [Data flow](./data-flow) for details on data types and transport.

### 6. Leverage caching

Enable [caching](https://www.union.ai/docs/v2/union/user-guide/task-configuration/caching) to avoid redundant computation:

```python
@env.task(cache="auto")
async def expensive_computation(input_data: dict) -> dict:
    """Automatically cached based on inputs."""
    # Expensive operation
    return result
```

**Benefits**:
- Skips re-execution for identical inputs
- Reduces overall workflow runtime
- Preserves resources for new computations

**When to use**:
- Deterministic tasks (same inputs → same outputs)
- Expensive computations (model training, large data processing)
- Stable intermediate results

### 7. Parallelize with `flyte.map`

Use [`flyte.map`](https://www.union.ai/docs/v2/union/user-guide/task-programming/fanout) for data-parallel workloads:

```python
@env.task
async def process_item(item: dict) -> dict:
    return {"processed": item}

@env.task
async def parallel_processing(items: list[dict]) -> list[dict]:
    """Process items in parallel using map."""
    results = await flyte.map(process_item, items)
    return results
```

**Benefits**:
- Automatic parallelization
- Dynamic scaling based on available resources
- Built-in error handling and retries

**Best practices**:
- Combine with batching to control fanout
- Use with reusable containers for maximum throughput
- Consider memory and resource limits

## Performance tuning workflow

Follow this workflow to optimize your Flyte workflows:

1. **Profile**: Measure task execution times and identify bottlenecks.
2. **Calculate overhead**: Estimate `2u + 2d + e + t` for your tasks.
3. **Compare**: Check if `task runtime >> overhead`. If not, optimize.
4. **Batch**: Increase batch size to amortize overhead.
5. **Reusable containers**: Enable reusable containers to eliminate `t`.
6. **Traces**: Use traces for lightweight operations within tasks.
7. **Cache**: Enable caching for deterministic, expensive tasks.
8. **Limit fanout**: Keep total actions below 50k (target 10k-20k).
9. **Monitor**: Use the UI to monitor execution and identify issues.
10. **Iterate**: Continuously refine based on performance metrics.

## Real-world example: PyIceberg batch processing

For a comprehensive example of efficient data processing with Flyte, see the [PyIceberg parallel batch aggregation example](https://github.com/flyteorg/flyte-sdk/blob/main/examples/data_processing/pyiceberg_example.py). This example demonstrates:

- **Zero-copy data passing**: Pass file paths instead of data between tasks
- **Reusable containers with concurrency**: Maximize CPU utilization across workers
- **Parallel file processing**: Use `asyncio.gather()` to process multiple files concurrently
- **Efficient batching**: Distribute parquet files across worker tasks

Key pattern from the example:

```python
# Instead of loading entire table, get file paths
file_paths = [task.file.file_path for task in table.scan().plan_files()]

# Distribute files across partitions (zero-copy!)
partition_files = distribute_files(file_paths, num_partitions)

# Process partitions in parallel
results = await asyncio.gather(*[
    aggregate_partition(files, partition_id)
    for partition_id, files in enumerate(partition_files)
])
```

This approach achieves true parallel file processing without loading the entire dataset into memory.

## Example: Optimizing a data pipeline

### Before optimization

```python
@env.task
async def process_item(item: dict) -> dict:
    # Very fast operation (~100ms)
    return {"processed": item["id"]}

@env.task
async def process_dataset(items: list[dict]) -> list[dict]:
    # Create 1M tasks
    results = await asyncio.gather(*[process_item(item) for item in items])
    return results
```

**Issues**:
- 1M tasks created (exceeds UI limit)
- Task overhead >> task runtime (100ms task, seconds of overhead)
- High load on Queue Service and object storage

### After optimization

```python
# Use reusable containers
env = flyte.TaskEnvironment(
    name="optimized-pipeline",
    reuse_policy=flyte.ReusePolicy(
        replicas=(5, 20),
        concurrency=10,
        scaledown_ttl=timedelta(minutes=10),
        idle_ttl=timedelta(hours=1)
    )
)

@env.task
async def process_batch(items: list[dict]) -> list[dict]:
    # Process batch of items
    return [{"processed": item["id"]} for item in items]

@env.task
async def process_dataset(items: list[dict]) -> list[dict]:
    # Create 1000 tasks (batch size 1000)
    batch_size = 1000
    batches = [items[i:i + batch_size] for i in range(0, len(items), batch_size)]

    results = await flyte.map(process_batch, batches)
    return [item for batch in results for item in batch]
```

**Improvements**:
- 1000 tasks instead of 1M (within limits)
- Batch runtime ~100 seconds (100ms × 1000 items)
- Reusable containers eliminate startup overhead
- Concurrency enables high throughput (200 concurrent tasks max)

## When to contact the Union team

Reach out to the Union team if you:

- Need more than 50k actions per run
- Want to use high-performance metastores (Redis, PostgreSQL) instead of object stores
- Have specific performance requirements or constraints
- Need help profiling and optimizing your workflows

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/run-scaling/batch-inference ===

# Maximize GPU utilization for batch inference

GPUs are expensive. When running batch inference, the single biggest cost driver is **idle GPU time** — cycles where the GPU sits waiting with nothing to do. Understanding why this happens and how to fix it is the key to cost-effective batch inference.

## Why GPU utilization drops

A typical inference task does three things:

1. **Load data** — read from storage, deserialize, preprocess (CPU/IO-bound)
2. **Run inference** — forward pass through the model (GPU-bound)
3. **Post-process** — format results, write outputs (CPU/IO-bound)

When these steps run sequentially, the GPU is idle during steps 1 and 3. For many workloads, data loading and preprocessing dominate wall-clock time, leaving the GPU busy for only a fraction of the total:

```mermaid
gantt
    title Sequential execution — GPU idle during CPU/IO work
    dateFormat X
    axisFormat %s

    section Task 1
    Load data (CPU/IO)     :a1, 0, 3
    Inference (GPU)        :a2, after a1, 2
    Post-process (CPU/IO)  :a3, after a2, 1

    section Task 2
    Load data (CPU/IO)     :b1, after a3, 3
    Inference (GPU)        :b2, after b1, 2
    Post-process (CPU/IO)  :b3, after b2, 1

    section GPU
    Idle                   :crit, g1, 0, 3
    Busy                   :active, g2, 3, 5
    Idle                   :crit, g3, 5, 9
    Busy                   :active, g4, 9, 11
    Idle                   :crit, g5, 11, 12
```

In this example, the GPU is busy for only 4 out of 12 time units — **33% utilization**. The rest is wasted waiting for CPU and IO operations.

## Serving vs in-process batch inference

There are two common approaches to batch inference: sending requests to a **hosted model server** (serving), or running the model **in-process** alongside data loading. Each has distinct trade-offs:

| | Hosted serving | In-process (Flyte) |
|---|---|---|
| **Architecture** | Separate inference server (e.g. Triton, vLLM server, TGI) accessed over the network | Model loaded directly in the task process, inference via `DynamicBatcher` |
| **Data transfer** | Every request serialized over the network; large payloads add latency | Zero-copy — data stays in-process, no serialization overhead |
| **Backpressure** | Hard to implement; push-based architecture can overwhelm the server or drop requests | Two levels: `DynamicBatcher` queue blocks producers when full, and Flyte's task scheduling automatically queues new inference tasks when replicas are busy — backpressure propagates end-to-end without any extra code |
| **Utilization** | Servers are often over-provisioned to maintain availability, leading to low average utilization | Batcher continuously fills the GPU with work from concurrent producers |
| **Multi-model** | Each model needs its own serving deployment, load balancer, and scaling config | Multiple models can time-share the same GPU — when one model finishes, the next is loaded automatically via reusable containers, no container orchestration required |
| **Scaling** | Requires separate infrastructure for the serving layer (load balancers, autoscalers, health checks) | Scales with Flyte — replicas auto-scale based on demand |
| **Cost** | Pay for always-on serving infrastructure even during low-traffic periods | Pay only for the duration of the batch job |
| **Fault tolerance** | Need retries, circuit breakers, and timeout handling for network failures | Failures are local; Flyte handles retries and recovery at the task level |
| **Best for** | Real-time / low-latency serving with unpredictable request patterns | Large-scale batch processing with known datasets |

For batch workloads, in-process inference eliminates the network overhead and infrastructure complexity of a serving layer while achieving higher GPU utilization through intelligent batching.

## Solution: `DynamicBatcher`

`DynamicBatcher` from `flyte.extras` solves the utilization problem by **separating data loading from inference** and running them concurrently. Multiple async producers load and preprocess data while a single consumer feeds the GPU in optimally-sized batches:

```mermaid
flowchart LR
    subgraph producers ["Concurrent producers (CPU/IO)"]
        P1["Stream 1: load + preprocess"]
        P2["Stream 2: load + preprocess"]
        P3["Stream N: load + preprocess"]
    end

    subgraph batcher ["DynamicBatcher"]
        Q["Queue with backpressure"]
        A["Aggregation loop<br/>(assembles cost-budgeted batches)"]
        Q --> A
    end

    subgraph consumer ["Processing loop (GPU)"]
        G["process_fn / inference_fn<br/>(batched forward pass)"]
    end

    P1 --> Q
    P2 --> Q
    P3 --> Q
    A --> G
```

The batcher runs two internal loops:

1. **Aggregation loop** — drains the submission queue and assembles batches that respect a cost budget (`target_batch_cost`), a maximum size (`max_batch_size`), and a timeout (`batch_timeout_s`). This ensures the GPU always receives optimally-sized batches.
2. **Processing loop** — pulls assembled batches and calls your processing function, resolving each record's future with its result.

This pipelining means the GPU is processing batch N while data for batch N+1 is being loaded and assembled — **eliminating idle time**.

### Basic usage

```python
from flyte.extras import DynamicBatcher

async def process(batch: list[dict]) -> list[str]:
    """Your batch processing function. Must return results in the same order as the input."""
    return [heavy_computation(item) for item in batch]

async with DynamicBatcher(
    process_fn=process,
    target_batch_cost=1000,   # cost budget per batch
    max_batch_size=64,        # hard cap on records per batch
    batch_timeout_s=0.05,     # max wait time before dispatching a partial batch
    max_queue_size=5_000,     # queue size for backpressure
) as batcher:
    futures = []
    for record in my_records:
        future = await batcher.submit(record, estimated_cost=10)
        futures.append(future)
    results = await asyncio.gather(*futures)
```

Each call to `submit()` is non-blocking — it enqueues the record and immediately returns a `Future`. When the queue is full, `submit()` awaits until space is available, providing natural backpressure to prevent producers from overwhelming the GPU.

### Cost estimation

The batcher uses cost estimates to decide how many records to group into each batch. You can provide costs in several ways (checked in order of precedence):

1. **Explicit** — pass `estimated_cost` to `submit()`
2. **Estimator function** — pass `cost_estimator` to the constructor
3. **Protocol** — implement `estimate_cost()` on your record type
4. **Default** — falls back to `default_cost` (default: 1)

## `TokenBatcher` for LLM inference

For LLM workloads, `TokenBatcher` is a convenience subclass that uses token-aware parameter names:

```python
from dataclasses import dataclass
from flyte.extras import TokenBatcher

@dataclass
class Prompt:
    text: str

    def estimate_tokens(self) -> int:
        """Rough token estimate (~4 chars per token)."""
        return len(self.text) // 4 + 1

async def inference(batch: list[Prompt]) -> list[str]:
    """Run batched inference through your model."""
    texts = [p.text for p in batch]
    outputs = model.generate(texts, sampling_params)
    return [o.outputs[0].text for o in outputs]

async with TokenBatcher(
    inference_fn=inference,
    target_batch_tokens=32_000,  # token budget per batch
    max_batch_size=256,
) as batcher:
    future = await batcher.submit(Prompt(text="What is 2+2?"))
    result = await future
```

`TokenBatcher` checks the `TokenEstimator` protocol (`estimate_tokens()`) in addition to `CostEstimator` (`estimate_cost()`), making it natural to work with prompt types.

## Combining with reusable containers

`DynamicBatcher` on its own improves utilization within a single task. When combined with [reusable containers](https://www.union.ai/docs/v2/union/user-guide/task-configuration/reusable-containers), it becomes significantly more powerful:

- **Amortized model loading** — the model is loaded once per container and reused across many task invocations, avoiding repeated download and initialization costs
- **Cross-task batching** — with `ReusePolicy(concurrency=N)`, multiple task invocations run concurrently on the same replica, all feeding records into the **same shared batcher**. This means the GPU always has a full queue of work.
- **Automatic scaling** — replicas scale between min and max based on demand, and each replica maintains its own model + batcher

```mermaid
flowchart TB
    D["Driver task<br/>fans out chunks"] --> |chunk 1| R1
    D --> |chunk 2| R1
    D --> |chunk 3| R2
    D --> |chunk ...| R1
    D --> |chunk N| R2

    subgraph R1 ["GPU Replica 1"]
        direction TB
        M1["Model (loaded once via alru_cache)"]
        B1["Shared TokenBatcher"]
        T1a["infer_batch call 1"] --> B1
        T1b["infer_batch call 2"] --> B1
        T1c["infer_batch call ..."] --> B1
        B1 --> M1
    end

    subgraph R2 ["GPU Replica 2"]
        direction TB
        M2["Model (loaded once via alru_cache)"]
        B2["Shared TokenBatcher"]
        T2a["infer_batch call 1"] --> B2
        T2b["infer_batch call 2"] --> B2
        T2c["infer_batch call ..."] --> B2
        B2 --> M2
    end
```

The key technique is using `@alru_cache` to create **process-level singletons** — the model and batcher are initialized on the first task invocation and reused by all subsequent invocations on that replica.

### Example: batch LLM inference with vLLM

This example loads math problems from HuggingFace's gsm8k dataset and solves them using batched vLLM inference across GPU replicas.

#### 1. Define the environment

```python
import flyte
from flyte.extras import TokenBatcher

image = (
    flyte.Image.from_debian_base()
    .with_pip_packages("vllm", "hf-transfer", "unionai-reuse")
    .with_env_vars({"HF_HUB_ENABLE_HF_TRANSFER": "1"})
)

gpu_env = flyte.TaskEnvironment(
    name="gpu_worker",
    resources=flyte.Resources(cpu=4, memory="16Gi", gpu="A10G:1"),
    image=image,
    reusable=flyte.ReusePolicy(
        replicas=2,        # 2 GPU replicas
        concurrency=10,    # 10 concurrent tasks per replica
    ),
)

driver_env = flyte.TaskEnvironment(
    name="driver",
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    image=image,
    depends_on=[gpu_env],
)
```

With `replicas=2` and `concurrency=10`, up to 20 `infer_batch` calls run simultaneously across 2 GPUs, all sharing their replica's model and batcher.

#### 2. Create process-level singletons

```python
from async_lru import alru_cache
from dataclasses import dataclass

@dataclass
class Prompt:
    task_id: str
    index: int
    text: str

    def estimate_tokens(self) -> int:
        return len(self.text) // 4 + 1

@alru_cache(maxsize=1)
async def get_inference_fn():
    """Load the model once per container lifetime."""
    from vllm import LLM, SamplingParams
    llm = LLM(model="Qwen/Qwen2.5-7B-Instruct", gpu_memory_utilization=0.9, max_model_len=4096)
    params = SamplingParams(temperature=0.7, max_tokens=512)

    async def inference(batch: list[Prompt]) -> list[str]:
        texts = [p.text for p in batch]
        outputs = llm.generate(texts, params)
        return [o.outputs[0].text for o in outputs]

    return inference

@alru_cache(maxsize=1)
async def get_batcher() -> TokenBatcher[Prompt, str]:
    """Create a single batcher per container — shared across all concurrent tasks."""
    inference_fn = await get_inference_fn()
    batcher = TokenBatcher[Prompt, str](
        inference_fn=inference_fn,
        target_batch_tokens=32_000,
        max_batch_size=256,
        batch_timeout_s=0.05,
        max_queue_size=5_000,
    )
    await batcher.start()
    return batcher
```

#### 3. Define the GPU worker task

```python
import asyncio
import logging

logger = logging.getLogger(__name__)

@gpu_env.task
async def infer_batch(prompts: list[str], task_id: str) -> list[str]:
    """Submit prompts to the shared batcher and return completions."""
    batcher = await get_batcher()

    futures: list[asyncio.Future[str]] = []
    for idx, text in enumerate(prompts):
        record = Prompt(task_id=task_id, index=idx, text=text)
        future = await batcher.submit(record)
        futures.append(future)

    results = await asyncio.gather(*futures)
    logger.info(
        "[%s] completed %d records | utilization: %.1f%% | batches: %d",
        task_id,
        len(results),
        batcher.stats.utilization * 100,
        batcher.stats.total_batches,
    )
    return list(results)
```

Every concurrent `infer_batch` call on the same replica feeds into the same batcher. The batcher continuously assembles token-budgeted batches from all concurrent callers, keeping the GPU saturated.

#### 4. Define the driver task

```python
@driver_env.task
async def main(num_questions: int = 500, chunk_size: int = 50) -> dict[str, list[str]]:
    """Fetch questions and fan out across GPU replicas."""
    questions = await fetch_questions(num_questions)

    chunks = [questions[i:i + chunk_size] for i in range(0, len(questions), chunk_size)]
    task_ids = [f"chunk_{i:03d}" for i in range(len(chunks))]

    all_results = await asyncio.gather(
        *(infer_batch(chunk, tid) for chunk, tid in zip(chunks, task_ids))
    )
    return dict(zip(task_ids, all_results))
```

## Monitoring utilization

`DynamicBatcher` exposes a `stats` property with real-time metrics:

```python
stats = batcher.stats
print(f"Utilization: {stats.utilization:.1%}")         # fraction of time spent processing
print(f"Records processed: {stats.total_completed}")
print(f"Batches dispatched: {stats.total_batches}")
print(f"Avg batch size: {stats.avg_batch_size:.1f}")
print(f"Busy time: {stats.busy_time_s:.1f}s")
print(f"Idle time: {stats.idle_time_s:.1f}s")
```

| Metric | Description |
|---|---|
| `utilization` | Fraction of wall-clock time spent inside `process_fn` (0.0–1.0). Target: > 0.9. |
| `total_submitted` | Total records submitted via `submit()` |
| `total_completed` | Total records whose futures have been resolved |
| `total_batches` | Number of batches dispatched to `process_fn` |
| `avg_batch_size` | Running average records per batch |
| `avg_batch_cost` | Running average cost per batch |
| `busy_time_s` | Cumulative seconds spent inside `process_fn` |
| `idle_time_s` | Cumulative seconds the processing loop waited for batches |

If utilization is low, consider:
- **Increasing concurrency** — more concurrent producers means the batcher has more records to assemble into batches
- **Reducing `batch_timeout_s`** — dispatch partial batches faster instead of waiting
- **Increasing `max_queue_size`** — allow more records to be buffered ahead of the GPU
- **Adding more data streams** — ensure the GPU always has work queued up

