# Controlling parallel execution

When you [fan out](https://www.union.ai/docs/v2/union/user-guide/task-programming/controlling-parallelism/fanout) to many tasks, you often need to limit how many run at the same time.
Common reasons include rate-limited APIs, GPU quotas, database connection limits, or simply avoiding overwhelming a downstream service.

Flyte 2 provides two ways to control concurrency:
[`asyncio.Semaphore`](https://docs.python.org/3/library/asyncio-sync.html#asyncio.Semaphore) for fine-grained control,
and `flyte.map` with a built-in `concurrency` parameter for simpler cases.

## The problem: unbounded parallelism

A straightforward `asyncio.gather` launches every task at once.
If you are calling an external API that allows only a few concurrent requests, this can cause throttling or errors:

```
import asyncio

import flyte

env = flyte.TaskEnvironment("controlling_parallelism")

@env.task
async def call_llm_api(prompt: str) -> str:
    """Simulate calling a rate-limited LLM API."""
    # In a real workflow, this would call an external API.
    # The API might allow only a few concurrent requests.
    await asyncio.sleep(0.5)
    return f"Response to: {prompt}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

```
@env.task
async def process_all_at_once(prompts: list[str]) -> list[str]:
    """Send all requests in parallel with no concurrency limit.

    This can overwhelm a rate-limited API, causing errors or throttling.
    """
    results = await asyncio.gather(*[call_llm_api(p) for p in prompts])
    return list(results)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

With eight prompts, this fires eight concurrent API calls.
That works fine when there are no limits, but will fail when the API enforces a concurrency cap.

## Using asyncio.Semaphore

An `asyncio.Semaphore` acts as a gate: only a fixed number of tasks can pass through at a time.
The rest wait until a slot opens up.

```
@env.task
async def process_batch_with_semaphore(
    prompts: list[str],
    max_concurrent: int = 3,
) -> list[str]:
    """Process prompts in parallel, limiting concurrency with a semaphore.

    At most `max_concurrent` calls to the API run at any given time.
    The remaining tasks wait until a slot is available.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_call(prompt: str) -> str:
        async with semaphore:
            return await call_llm_api(prompt)

    results = await asyncio.gather(*[limited_call(p) for p in prompts])
    return list(results)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

The pattern is:

1. Create a semaphore with the desired limit.
2. Wrap each task call in an inner async function that acquires the semaphore before calling and releases it after.
3. Pass all wrapped calls to `asyncio.gather`.

All eight tasks are submitted immediately, but the Flyte orchestrator only allows three to run in parallel.
As each one completes, the next waiting task starts.

> [!NOTE]
> The semaphore controls how many tasks execute concurrently on the Flyte cluster.
> Each task still runs in its own container with its own resources — the semaphore simply limits how many containers are active at a time.

## Using flyte.map with concurrency

For uniform work — applying the same task to a list of inputs — `flyte.map` with the `concurrency` parameter is simpler:

```
@env.task
async def process_batch_with_map(prompts: list[str]) -> list[str]:
    """Process prompts using flyte.map with a built-in concurrency limit.

    This is the simplest approach when every item goes through the same task.
    """
    results = list(flyte.map(call_llm_api, prompts, concurrency=3))
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

This achieves the same concurrency limit with less boilerplate.

## Running the example

```
if __name__ == "__main__":
    flyte.init_from_config()
    prompts = [
        "Summarize this text",
        "Translate to French",
        "Extract key points",
        "Generate a title",
        "Write a conclusion",
        "List the main topics",
        "Identify the tone",
        "Suggest improvements",
    ]
    r = flyte.run(process_batch_with_semaphore, prompts)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/controlling-parallelism/controlling_parallelism.py*

## When to use each approach

Use **`flyte.map(concurrency=N)`** when:

- Every item goes through the same task.
- You want the simplest possible code.

Use **`asyncio.Semaphore`** when:

- You need different concurrency limits for different task types within the same workflow.
- You want to combine concurrency control with error handling (e.g., `asyncio.gather(*tasks, return_exceptions=True)`).
- You are calling multiple different tasks in one parallel batch.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/controlling-parallelism.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/controlling-parallelism/
