# DynamicBatcher

**Package:** `flyte.extras`

Batches records from many concurrent producers and runs them through
a single async processing function, maximizing resource utilization.

The batcher runs two internal loops:

1. **Aggregation loop** — drains the submission queue and assembles
   cost-budgeted batches, respecting `target_batch_cost`,
   `max_batch_size`, and `batch_timeout_s`.
2. **Processing loop** — pulls assembled batches and calls
   `process_fn`, resolving each record's `asyncio.Future`.

Type Parameters:
    RecordT: The input record type produced by your tasks.
    ResultT: The per-record output type returned by `process_fn`.

## Parameters

```python
class DynamicBatcher(
    process_fn: ProcessFn[RecordT, ResultT],
    cost_estimator: CostEstimatorFn[RecordT] | None,
    target_batch_cost: int,
    max_batch_size: int,
    min_batch_size: int,
    batch_timeout_s: float,
    max_queue_size: int,
    prefetch_batches: int,
    default_cost: int,
)
```
| Parameter | Type | Description |
|-|-|-|
| `process_fn` | `ProcessFn[RecordT, ResultT]` | `async def f(batch: list[RecordT]) -&gt; list[ResultT]` Must return results in the **same order** as the input batch. |
| `cost_estimator` | `CostEstimatorFn[RecordT] \| None` | Optional `(RecordT) -&gt; int` function.  When provided, it is called to estimate the cost of each submitted record. Falls back to `record.estimate_cost()` if the record implements `CostEstimator`, then to `default_cost`. |
| `target_batch_cost` | `int` | Cost budget per batch.  The aggregator fills batches up to this limit before dispatching. |
| `max_batch_size` | `int` | Hard cap on records per batch regardless of cost budget. |
| `min_batch_size` | `int` | Minimum records before dispatching.  Ignored when the timeout fires or shutdown is in progress. |
| `batch_timeout_s` | `float` | Maximum seconds to wait for a full batch.  Lower values reduce idle time but may produce smaller batches. |
| `max_queue_size` | `int` | Bounded queue size.  When full, `submit` awaits (backpressure). |
| `prefetch_batches` | `int` | Number of pre-assembled batches to buffer between the aggregation and processing loops. |
| `default_cost` | `int` | Fallback cost when no estimator is available. |

## Properties

| Property | Type | Description |
|-|-|-|
| `is_running` | `None` | Whether the aggregation and processing loops are active. |
| `stats` | `None` | Current `BatchStats` snapshot. |

## Methods

| Method | Description |
|-|-|
| [`start()`](#start) | Start the aggregation and processing loops. |
| [`stop()`](#stop) | Graceful shutdown: process all enqueued work, then stop. |
| [`submit()`](#submit) | Submit a single record for batched processing. |
| [`submit_batch()`](#submit_batch) | Convenience: submit multiple records and return their futures. |

### start()

```python
def start()
```
Start the aggregation and processing loops.

**Raises**

| Exception | Description |
|-|-|
| `RuntimeError` | If the batcher is already running. |

### stop()

```python
def stop()
```
Graceful shutdown: process all enqueued work, then stop.

Blocks until every pending future is resolved.

### submit()

```python
def submit(
    record: RecordT,
    estimated_cost: int | None,
) -> asyncio.Future[ResultT]
```
Submit a single record for batched processing.

Returns an `asyncio.Future` that resolves once the batch
containing this record has been processed.

| Parameter | Type | Description |
|-|-|-|
| `record` | `RecordT` | The input record. |
| `estimated_cost` | `int \| None` | Optional explicit cost.  When omitted the batcher tries `cost_estimator`, then `record.estimate_cost()`, then `default_cost`. |

**Returns**

A future whose result is the corresponding entry from the list
returned by `process_fn`.

**Raises**

| Exception | Description |
|-|-|
| `RuntimeError` | If the batcher is not running. |

> [!NOTE]
> If the internal queue is full this coroutine awaits until space
> is available, providing natural backpressure to fast producers.

### submit_batch()

```python
def submit_batch(
    records: Sequence[RecordT],
    estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]
```
Convenience: submit multiple records and return their futures.

| Parameter | Type | Description |
|-|-|-|
| `records` | `Sequence[RecordT]` | Iterable of input records. |
| `estimated_cost` | `Sequence[int] \| None` | Optional per-record cost estimates.  Length must match *records* when provided. |

**Returns:** List of futures, one per record.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/api-reference/flyte-sdk/packages/flyte.extras/dynamicbatcher.md
**HTML**: https://www.union.ai/docs/v2/union/api-reference/flyte-sdk/packages/flyte.extras/dynamicbatcher/
