# From Flyte 1 to 2
> This bundle contains all pages in the From Flyte 1 to 2 section.
> Source: https://www.union.ai/docs/v2/union/user-guide/flyte-2/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/flyte-2 ===

# From Flyte 1 to 2

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Flyte 2 represents a fundamental shift in how Flyte workflows are written and executed.

## Pure Python execution

Write workflows in pure Python, enabling a more natural development experience and removing the constraints of a
domain-specific language (DSL).

### Sync Python

```
import flyte

env = flyte.TaskEnvironment("sync_example_env")

@env.task
def hello_world(name: str) -> str:
    return f"Hello, {name}!"

@env.task
def main(name: str) -> str:
    for i in range(10):
        hello_world(name)
    return "Done"

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, name="World")
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/sync_example.py*

### Async Python

```
import asyncio
import flyte

env = flyte.TaskEnvironment("async_example_env")

@env.task
async def hello_world(name: str) -> str:
    return f"Hello, {name}!"

@env.task
async def main(name: str) -> str:
    results = []
    for i in range(10):
        results.append(hello_world(name))
    await asyncio.gather(*results)
    return "Done"

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, name="World")
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/async_example.py*

As you can see in the hello world example, workflows can be constructed at runtime, allowing for more flexible and
adaptive behavior. Flyte 2 supports:

- Python's asynchronous programming model to express parallelism.
- Python's native error handling with `try-except` to overridden configurations, like resource requests.
- Predefined static workflows when compile-time safety is critical.

## Simplified API

The new API is more intuitive, with fewer abstractions to learn and a focus on simplicity.

| Use case                      | Flyte 1                     | Flyte 2                                 |
| ----------------------------- | --------------------------- | --------------------------------------- |
| Environment management        | `N/A`                       | `TaskEnvironment`                       |
| Perform basic computation     | `@task`                     | `@env.task`                             |
| Combine tasks into a workflow | `@workflow`                 | `@env.task`                             |
| Create dynamic workflows      | `@dynamic`                  | `@env.task`                             |
| Fanout parallelism            | `flytekit.map`              | Python `for` loop with `asyncio.gather` |
| Conditional execution         | `flytekit.conditional`      | Python `if-elif-else`                   |
| Catching workflow failures    | `@workflow(on_failure=...)` | Python `try-except`                     |

There is no `@workflow` decorator. Instead, "workflows" are authored through a pattern of tasks calling tasks.
Tasks are defined within environments, which encapsulate the context and resources needed for execution.

## Fine-grained reproducibility and recoverability

As in Flyte 1, Flyte 2 supports caching at the task level (via `@env.task(cache=...)`), but it further enables recovery at the finer-grained, sub-task level through a feature called tracing (via `@flyte.trace`).

```
import flyte

env = flyte.TaskEnvironment(name="trace_example_env")

@flyte.trace
async def call_llm(prompt: str) -> str:
    return "Initial response from LLM"

@env.task
async def finalize_output(output: str) -> str:
    return "Finalized output"

@env.task(cache=flyte.Cache(behavior="auto"))
async def main(prompt: str) -> str:
    output = await call_llm(prompt)
    output = await finalize_output(output)
    return output

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main, prompt="Prompt to LLM")
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/trace.py*

Here `call_llm` runs in the same container as `main` and acts as an automated checkpoint with full observability in the UI.
If the task fails due to a system error (e.g., node preemption or infrastructure failure), Flyte can recover and replay from the
last successful trace rather than restarting from the beginning.

Note that tracing is distinct from caching: traces are recovered only if there is a system failure
whereas with cached outputs are persisted for reuse across separate runs.

## Improved remote functionality

Flyte 2 provides full management of the workflow lifecycle through a standardized API through the CLI and the Python SDK.

| Use case      | CLI                | Python SDK          |
| ------------- | ------------------ | ------------------- |
| Run a task    | `flyte run ...`    | `flyte.run(...)`    |
| Deploy a task | `flyte deploy ...` | `flyte.deploy(...)` |

You can also fetch and run remote (previously deployed) tasks within the course of a running workflow.

```
import flyte
from flyte import remote

env_1 = flyte.TaskEnvironment(name="env_1")
env_2 = flyte.TaskEnvironment(name="env_2")
env_1.add_dependency(env_2)

@env_2.task
async def remote_task(x: str) -> str:
    return "Remote task processed: " + x

@env_1.task
async def main() -> str:
    remote_task_ref = remote.Task.get("env_2.remote_task", auto_version="latest")
    r = await remote_task_ref(x="Hello")
    return "main called remote and recieved: " + r

if __name__ == "__main__":
    flyte.init_from_config()
    d = flyte.deploy(env_1)
    print(d[0].summary_repr())
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/remote.py*

## Native Notebook support

Author and run workflows and fetch workflow metadata (I/O and logs) directly from Jupyter notebooks.

![Native Notebook](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/notebook.png)

## High performance engine

When running on a Union.ai backend, Flyte 2 enables you to schedule tasks in milliseconds with reusable containers, which massively increases the throughput of containerized tasks.

```
# Currently required to enable resuable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.10")

env = flyte.TaskEnvironment(
    name="reusable-env",
    resources=flyte.Resources(memory="1Gi", cpu="500m"),
    reusable=flyte.ReusePolicy(replicas=2, concurrency=1), # Specify reuse policy
    image=reusable_image  # Use the container image augmented with the unionai-reuse library.
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/reuse.py*

Coupled with multi-cluster, multi-cloud, and multi-region support, Flyte 2 on Union.ai can scale to handle even the most demanding workflows.

## Enhanced UI

The Union.ai backend also offers a new UI with a streamlined and user-friendly experience for authoring and managing workflows.

![New UI](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/user-guide/v2ui.png)

This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other important information.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/flyte-2/pure-python ===

# Pure Python

Flyte 2 introduces a new way of writing workflows that is based on pure Python, removing the constraints of a domain-specific language (DSL) and enabling full use of Python's capabilities.

## From `@workflow` DSL to pure Python

| Flyte 1 | Flyte 2 |
| --- | --- |
| `@workflow`-decorated functions are constrained to a subset of Python for defining a static directed acyclic graph (DAG) of tasks. | **No more `@workflow` decorator**: Everything is a `@env.task`, so your top-level “workflow” is simply a task that calls other tasks. |
| `@task`-decorated functions could leverage the full power of Python, but only within individual container executions. | `@env.task`s can call other `@env.task`s and be used to construct workflows with dynamic structures using loops, conditionals, try/except, and any Python construct anywhere. |
| Workflows were compiled into static DAGs at registration time, with tasks as the nodes and the DSL defining the structure. | Workflows are simply tasks that call other tasks. Compile-time safety will be available in the future as `compiled_task`. |

### Flyte 1

```python
import flytekit

image = flytekit.ImageSpec(
    name="hello-world-image",
    packages=["requests"],
)

@flytekit.task(container_image=image)
def mean(data: list[float]) -> float:
    return sum(list) / len(list)

@flytekit.workflow
def main(data: list[float]) -> float:
    output = mean(data)

    # ❌ performing trivial operations in a workflow is not allowed
    # output = output / 100

    # ❌ if/else is not allowed
    # if output < 0:
    #     raise ValueError("Output cannot be negative")

    return output
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/pure-python/flyte_1.py*

### Flyte 2

```
import flyte

env = flyte.TaskEnvironment(
    "hello_world",
    image=flyte.Image.from_debian_base().with_pip_packages("requests"),
)

@env.task
def mean(data: list[float]) -> float:
    return sum(data) / len(data)

@env.task
def main(data: list[float]) -> float:
    output = mean(data)

    # ✅ performing trivial operations in a workflow is allowed
    output = output / 100

    # ✅ if/else is allowed
    if output < 0:
        raise ValueError("Output cannot be negative")

    return output
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/pure-python/flyte_2.py*

These fundamental changes bring several transformative benefits:

- **Flexibility**: Harness the complete Python language for workflow definition, including all control flow constructs previously forbidden in workflows.
- **Dynamic workflows**: Create workflows that adapt to runtime conditions, handle variable data structures, and make decisions based on intermediate results.
- **Natural error handling**: Use standard Python `try`/`except` patterns throughout your workflows, making them more robust and easier to debug.
- **Intuitive composability**: Build complex workflows by naturally composing Python functions, following familiar patterns that any Python developer understands.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/flyte-2/async ===

# Asynchronous model

## Why we need an async model

The shift to an asynchronous model in Flyte 2 is driven by the need for more efficient and flexible workflow execution.

We believe, in particular, that with the rise of the agentic AI pattern, asynchronous programming has become an essential part of AI/ML engineering and data science toolkit.

With Flyte 2, the entire framework is now written with async constructs, allowing for:

- Seamless overlapping of I/O and independent external operations.
- Composing multiple tasks and external tool invocations within the same Python process.
- Native support of streaming operations for data, observability and downstream invocations.

It is also a natural fit for the expression parallelism in workflows.

### Understanding concurrency vs. parallelism

**Concurrency** means running multiple tasks at once. This can be achieved by interleaving execution on a single thread (switching between tasks when one is waiting) or by true **parallelism**—executing tasks truly simultaneously across multiple cores or machines. Parallelism is a form of concurrency, but concurrency doesn't require parallelism.

### Python's async evolution

Python's asynchronous programming capabilities have evolved significantly:

- **The GIL challenge**: Python's Global Interpreter Lock (GIL) traditionally prevented true parallelism for CPU-bound tasks, limiting threading effectiveness to I/O-bound operations.
- **Traditional solutions**:
  - `multiprocessing`: Created separate processes to sidestep the GIL, effective but resource-intensive
  - `threading`: Useful for I/O-bound tasks where the GIL could be released during external operations
- **The async revolution**: The `asyncio` library introduced cooperative multitasking within a single thread, using an event loop to manage multiple tasks efficiently.

### Parallelism in Flyte 1 vs Flyte 2

| | Flyte 1 | Flyte 2 |
| --- | --- | --- |
| Parallelism | The workflow DSL automatically parallelized tasks that weren't dependent on each other. The `map` operator allowed running a task multiple times in parallel with different inputs. | Leverages Python's `asyncio` as the primary mechanism for expressing parallelism, but with a crucial difference: **the Flyte orchestrator acts as the event loop**, managing task execution across distributed infrastructure. |

### Core async concepts

- **`async def`**: Declares a function as a coroutine. When called, it returns a coroutine object managed by the event loop rather than executing immediately.
- **`await`**: Pauses coroutine execution and passes control back to the event loop.
  In standard Python, this enables other tasks to run while waiting for I/O operations.
  In Flyte 2, it signals where tasks can be executed in parallel.
- **`asyncio.gather`**: The primary tool for concurrent execution.
  In standard Python, it schedules multiple awaitable objects to run concurrently within a single event loop.
  In Flyte 2, it signals to the orchestrator that these tasks can be distributed across separate compute resources.

#### A practical example

Consider this pattern for parallel data processing:

```
import asyncio
import flyte

env = flyte.TaskEnvironment("data_pipeline")

@env.task
async def process_chunk(chunk_id: int, data: str) -> str:
    # This could be any computational work - CPU or I/O bound
    await asyncio.sleep(1)  # Simulating work
    return f"Processed chunk {chunk_id}: {data}"

@env.task
async def parallel_pipeline(data_chunks: list[str]) -> list[str]:
    # Create coroutines for all chunks
    tasks = []
    for i, chunk in enumerate(data_chunks):
        tasks.append(process_chunk(i, chunk))

    # Execute all chunks in parallel
    results = await asyncio.gather(*tasks)
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/async/async.py*

In standard Python, this would provide concurrency benefits primarily for I/O-bound operations.
In Flyte 2, the orchestrator schedules each `process_chunk` task on separate Kubernetes pods or configured plugins, achieving true parallelism for any type of work.

### True parallelism for all workloads

This is where Flyte 2's approach becomes revolutionary: **async syntax is not just for I/O-bound operations**.
The `async`/`await` syntax becomes a powerful way to declare your workflow's parallel structure for any type of computation.

When Flyte's orchestrator encounters `await asyncio.gather(...)`, it understands that these tasks are independent and can be executed simultaneously across different compute resources.
This means you achieve true parallelism for:

- **CPU-bound computations**: Heavy mathematical operations, model training, data transformations
- **I/O-bound operations**: Database queries, API calls, file operations
- **Mixed workloads**: Any combination of computational and I/O tasks

The Flyte platform handles the complex orchestration while you express parallelism using intuitive `async` syntax.

## Calling sync tasks from async tasks

### Synchronous task support

Since many existing codebases use synchronous functions, Flyte 2 provides synchronous support. Under the hood, Flyte automatically "asyncifies" synchronous functions, wrapping them to participate seamlessly in the async execution model.
You don't need to rewrite existing code, just leverage the `.aio()` method when calling sync tasks from async contexts:

```
@env.task
def legacy_computation(x: int) -> int:
    # Existing synchronous function works unchanged
    return x * x + 2 * x + 1

@env.task
async def modern_workflow(numbers: list[int]) -> list[int]:
    # Call sync tasks from async context using .aio()
    tasks = []
    for num in numbers:
        tasks.append(legacy_computation.aio(num))

    results = await asyncio.gather(*tasks)
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/async/async.py*

### The `flyte.map` function: Familiar patterns

For scenarios that previously used Flyte 1's `map` operation, Flyte 2 provides `flyte.map` as a direct replacement.
The new `flyte.map` can be used either in synchronous or asynchronous contexts, allowing you to express parallelism without changing your existing patterns.

### Sync Map

```
@env.task
def sync_map_example(n: int) -> list[str]:
    # Synchronous version for easier migration
    results = []
    for result in flyte.map(process_item, range(n)):
        if isinstance(result, Exception):
            raise result
        results.append(result)
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/async/async.py*

### Async Map

```
@env.task
async def async_map_example(n: int) -> list[str]:
    # Async version using flyte.map - exact pattern from SDK examples
    results = []
    async for result in flyte.map.aio(process_item, range(n), return_exceptions=True):
        if isinstance(result, Exception):
            raise result
        results.append(result)
    return results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/flyte-2/async/async.py*

The `flyte.map` function provides:

- **Dual interfaces**: `flyte.map.aio()` for async contexts, `flyte.map()` for sync contexts.
- **Built-in error handling**: `return_exceptions` parameter for graceful failure handling. This matches the `asyncio.gather` interface,
  allowing you to decide how to handle errors.
  If you are coming from Flyte 1, it allows you to replace `min_success_ratio` in a more flexible way.
- **Automatic UI grouping**: Creates logical groups for better workflow visualization.
- **Concurrency control**: Optional limits for resource management.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/flyte-2/migration ===

# Migration from Flyte 1 to Flyte 2

> **📝 Note**
>
> For comprehensive migration reference with detailed API mappings, parameter tables, and complete examples, see [Migration from Flyte 1](https://www.union.ai/docs/v2/union/user-guide/api-reference/migration/_index) in the Reference section.
> An LLM-optimized bundle of the full migration reference is available at [`section.md`](https://www.union.ai/docs/v2/union/user-guide/api-reference/migration/section.md).

You can migrate from Flyte 1 to Flyte 2 by following the steps below:

### 1. Move task configuration to a `TaskEnvironment` object

Instead of configuring the image, hardware resources, and so forth, directly in the task decorator. You configure it in `TaskEnvironment` object. For example:

```python
env = flyte.TaskEnvironment(name="my_task_env")
```

### 2. Replace workflow decorators

Then, you replace the `@workflow` and `@task` decorators with `@env.task` decorators.

### Flyte 1

Here's a simple hello world example with fanout.

```python
import flytekit

@flytekit.task
def hello_world(name: str) -> str:
    return f"Hello, {name}!"

@flytekit.workflow
def main(names: list[str]) -> list[str]:
    return flytekit.map(hello_world)(names)
```

### Flyte 2 Sync

Change all the decorators to `@env.task` and swap out `flytekit.map` with `flyte.map`.
Notice that `flyte.map` is a drop-in replacement for Python's built-in `map` function.

```diff
-@flytekit.task
+@env.task
def hello_world(name: str) -> str:
    return f"Hello, {name}!"

-@flytekit.workflow
+@env.task
def main(names: list[str]) -> list[str]:
    return flyte.map(hello_world, names)
```

> **📝 Note**
>
> Note that the reason our task decorator uses `env` is simply because that is the variable to which we assigned the `TaskEnvironment` above.

### Flyte 2 Async

To take advantage of full concurrency (not just parallelism), use Python async
syntax and the `asyncio` standard library to implement fa-out.

```diff
+import asyncio

@env.task
-def hello_world(name: str) -> str:
+async def hello_world(name: str) -> str:
    return f"Hello, {name}!"

@env.task
-def main(names: list[str]) -> list[str]:
+async def main(names: list[str]) -> list[str]:
-    return flyte.map(hello_world, names)
+    return await asyncio.gather(*[hello_world(name) for name in names])
```

> **📝 Note**
>
> To use Python async syntax, you need to:
> - Use `asyncio.gather()` or `flyte.map()` for parallel execution
> - Add `async`/`await` keywords where you want parallelism
> - Keep existing sync task functions unchanged
>
> Learn more about about the benefits of async in the [Asynchronous Model](./async) guide.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/flyte-2/considerations ===

# Considerations

Flyte 2 represents a substantial change from Flyte 1.
Each Python-based task action has the ability to act as its own engine, kicking off sub-actions, and assembling the outputs, passing them to yet other sub-actions and such.

While this model of execution comes with an enormous amount of flexibility, that flexibility does warrant some caveats to keep in mind when authoring your tasks.

## Non-deterministic behavior

When a task launches another task, a new Action ID is determined.
This ID is a hash of the inputs to the task, the task definition itself, along with some other information.
The fact that this ID is consistently hashed is important when it comes to things like recovery and replay.

For example, assume you have the following tasks

```python
@env.task
async def t1():
    val = get_int_input()
    await t2(int=val)

@env.task
async def t2(val: int): ...
```

If you run `t1`, and it launches the downstream `t2` task, and then the pod executing `t1` fails, when Flyte restarts `t1` it will automatically detect that `t2` is still running and will just use that.
If `t2` ends up finishing in the interim, those results would just be used.

However, if you introduce non-determinism into the picture, then that guarantee is no longer there.
To give a contrived example:

```python
@env.task
async def t1():
    val = get_int_input()
    now = datetime.now()

    if now.second % 2 == 0:
        await t2(int=val)
    else:
        await t3(int=val)
```

Here, depending on what time it is, either `t2` or `t3` may end up running.
In the earlier scenario, if `t1` crashes unexpectedly, and Flyte retries the execution, a different downstream task may get kicked off instead.

### Dealing with non-determinism

As a developer, the best way to manage non-deterministic behavior (if it is unavoidable) is to be able to observe it and see exactly what is happening in your code. Flyte 2 provides precisely the tool needed to enable this: Traces.

With this feature you decorate the sub-task functions in your code with `@trace`, enabling checkpointing, reproducibility and recovery at a fine-grained level. See [Traces](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces) for more details.

## Type safety

In Flyte 1, the top-level workflow was defined by a Python-like DSL that was compiled into a static DAG composed of tasks, each of which was, internally, defined in real Python.
The system was able to guarantee type safety across task boundaries because the task definitions were static and the inputs and outputs were defined in a way that Flytekit could validate them.

In Flyte 2, the top-level workflow is defined by Python code that runs at runtime (unless using a compiled task).
This means that the system can no longer guarantee type safety at the workflow level.

Happily, the Python ecosystem has evolved considerably since Flyte 1, and Python type hints are now a standard way to define types.

Consequently, in Flyte 2, developers should use Python type hints and type checkers like `mypy` to ensure type safety at all levels, including the top-most task (i.e., the "workflow" level).

## No global state

A core principle of Flyte 2 (that is also shared with Flyte 1) is that you should not try to maintain global state across your workflow.
It will not be translated across tasks containers,

In a single process Python program, global variables are available across functions.
In the distributed execution model of Flyte, each task runs in its own container, and each container is isolated from the others.

If there is some state that needs to be preserved, it must be reconstructable through repeated deterministic execution.

## Driver pod requirements

Tasks don't have to kick off downstream tasks of course and may themselves represent a leaf level atomic unit of compute.
However, when tasks do run other tasks, and more so if they assemble the outputs of those other tasks, then that parent task becomes a driver
pod of sorts.
In Flyte 1, this assembling of intermediate outputs was done by Flyte Propeller.
In 2, it's done by the parent task.

This means that the pod running your parent task must be appropriately sized, and should ideally not be CPU-bound, otherwise it slow down downstream evaluation and kickoff of tasks.

For example, if you had this also scenario,

```python
@env.task
async def t_main():
    await t1()
    local_cpu_intensive_function()
    await t2()
```
The pod running `t_main` will hang in between tasks `t1` and `t2`. Your parent tasks should ideally focus only on orchestration.

## OOM risk from materialized I/O

Something maybe more nuanced to keep in mind is that if you're not using the soon-to-be-released ref mode, outputs are actually
materialized. That is, if you have the following scenario,

```python
@env.task
async def produce_1gb_list() -> List[float]: ...

@env.task
async def t1():
    list_floats = produce_1gb_list()
    t2(floats=list_floats)
```

The pod running `t1` needs to have memory to handle that 1 GB of floats. Those numbers will be materialized in that pod's memory.
This can lead to out of memory issues.

Note that `flyte.io.File`, `flyte.io.Dir` and `flyte.io.DataFrame` will not suffer from this because while those are materialized, they're only materialized as pointers to offloaded data, so their memory footprint is much lower.

