# DataFrames

By default, return values in Python are materialized - meaning the actual data is downloaded and loaded into memory. This applies to simple types like integers, as well as more complex types like DataFrames.

To avoid downloading large datasets into memory, Flyte V2 exposes [`flyte.io.dataframe`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.io/dataframe): a thin,  uniform wrapper type for DataFrame-style objects that allows you to pass a reference to the data, rather than the fully materialized contents.

The `flyte.io.DataFrame` type provides serialization support for common engines like `pandas`, `polars`, `pyarrow`, `dask`, etc.; enabling you to move data between different DataFrame backends.

## Setting up the environment and sample data

For our example we will start by setting up our task environment with the required dependencies and create some sample data.

```
from typing import Annotated

import numpy as np
import pandas as pd
import flyte
import flyte.io

env = flyte.TaskEnvironment(
    "dataframe_usage",
    image= flyte.Image.from_debian_base().with_pip_packages("pandas", "pyarrow", "numpy"),
    resources=flyte.Resources(cpu="1", memory="2Gi"),
)

BASIC_EMPLOYEE_DATA = {
    "employee_id": range(1001, 1009),
    "name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah"],
    "department": ["HR", "Engineering", "Engineering", "Marketing", "Finance", "Finance", "HR", "Engineering"],
    "hire_date": pd.to_datetime(
        ["2018-01-15", "2019-03-22", "2020-07-10", "2017-11-01", "2021-06-05", "2018-09-13", "2022-01-07", "2020-12-30"]
    ),
}

ADDL_EMPLOYEE_DATA = {
    "employee_id": range(1001, 1009),
    "salary": [55000, 75000, 72000, 50000, 68000, 70000, np.nan, 80000],
    "bonus_pct": [0.05, 0.10, 0.07, 0.04, np.nan, 0.08, 0.03, 0.09],
    "full_time": [True, True, True, False, True, True, False, True],
    "projects": [
        ["Recruiting", "Onboarding"],
        ["Platform", "API"],
        ["API", "Data Pipeline"],
        ["SEO", "Ads"],
        ["Budget", "Forecasting"],
        ["Auditing"],
        [],
        ["Platform", "Security", "Data Pipeline"],
    ],
}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

## Create a raw DataFrame

Now, let's create a task that returns a native Pandas DataFrame:

```
@env.task
async def create_raw_dataframe() -> pd.DataFrame:
    return pd.DataFrame(BASIC_EMPLOYEE_DATA)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

This is the most basic use-case of how to pass DataFrames (of all kinds, not just Pandas).
We simply create the DataFrame as normal, and return it.

Because the task has been declared to return a supported native DataFrame type (in this case `pandas.DataFrame` Flyte will automatically detect it, serialize it correctly and upload it at task completion enabling it to be passed transparently to the next task.

Flyte supports auto-serialization for the following DataFrame types:
* `pandas.DataFrame`
* `pyarrow.Table`
* `dask.dataframe.DataFrame`
* `polars.DataFrame`
* `flyte.io.DataFrame` (see below)

## Create a flyte.io.DataFrame

Alternatively you can also create a `flyte.io.DataFrame` object directly from a native object with the `from_df` method:

```
@env.task
async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "parquet"]:
    pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA)
    fdf = flyte.io.DataFrame.from_df(pd_df)
    return fdf
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

The `flyte.io.DataFrame` class creates a thin wrapper around objects of any standard DataFrame type. It serves as a generic "any DataFrame type" (a concept that Python itself does not currently offer).

As with native DataFrame types, Flyte will automatically serialize and upload the data at task completion.

The advantage of the unified `flyte.io.DataFrame` wrapper is that you can be explicit about the storage format that makes sense for your use case, by using an `Annotated` type where the second argument encodes format or other lightweight hints. For example, here we specify that the DataFrame should be stored as Parquet:

## Automatically convert between types

You can leverage Flyte to automatically download and convert the DataFrame between types when needed:

```
@env.task
async def join_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> flyte.io.DataFrame:
    joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner")
    return flyte.io.DataFrame.from_df(joined_df)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

This task takes two DataFrames as input. We'll pass one raw Pandas DataFrame, and one `flyte.io.DataFrame`.
Flyte automatically converts the `flyte.io.DataFrame` to a Pandas DataFrame (since we declared that as the input type) before passing it to the task.
The actual download and conversion happens only when we access the data, in this case, when we do the merge.

## Downloading DataFrames

When a task receives a `flyte.io.DataFrame`, you can request a concrete backend representation. For example, to download as a pandas DataFrame:

```
@env.task
async def download_data(joined_df: flyte.io.DataFrame):
    downloaded = await joined_df.open(pd.DataFrame).all()
    print("Downloaded Data:\n", downloaded)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

The `open()` call delegates to the DataFrame handler for the stored format and converts to the requested in-memory type.

## Run the example

Finally, we can define a `main` function to run the tasks defined above and a `__main__` block to execute the workflow:

```
@env.task
async def main():
    raw_df = await create_raw_dataframe ()
    flyte_df = await create_flyte_dataframe ()
    joined_df = await join_data (raw_df, flyte_df)
    await download_data (joined_df)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/dataframes.py*

## Polars DataFrames

The `flyteplugins-polars` package extends Flyte's DataFrame support to `polars.DataFrame` and `polars.LazyFrame`. Install it alongside the core SDK and it registers automatically — no additional configuration required.

```bash
pip install flyteplugins-polars
```

Both types are serialized as Parquet when passed between tasks, just like other DataFrame backends.

### Setup

```
import polars as pl

import flyte

env = flyte.TaskEnvironment(
    name="polars-dataframes",
    image=flyte.Image.from_debian_base(name="polars").with_pip_packages(
        "flyteplugins-polars>=2.0.0", "polars"
    ),
    resources=flyte.Resources(cpu="1", memory="2Gi"),
)

EMPLOYEE_DATA = {
    "employee_id": [1001, 1002, 1003, 1004, 1005, 1006],
    "name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona"],
    "department": ["Engineering", "Engineering", "Marketing", "Finance", "Finance", "Engineering"],
    "salary": [75000, 72000, 50000, 68000, 70000, 80000],
    "years_experience": [5, 4, 2, 6, 5, 7],
}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

### Eager DataFrames

Use `pl.DataFrame` when you want immediate evaluation. Flyte serializes it to Parquet on output and deserializes it on input:

```
@env.task
async def create_dataframe() -> pl.DataFrame:
    """Create a Polars DataFrame.

    Polars DataFrames are passed between tasks as serialized Parquet files
    stored in the Flyte blob store — no manual upload required.
    """
    return pl.DataFrame(EMPLOYEE_DATA)

@env.task
async def filter_high_earners(df: pl.DataFrame) -> pl.DataFrame:
    """Filter and enrich a Polars DataFrame."""
    return (
        df.filter(pl.col("salary") > 60000)
        .with_columns(
            (pl.col("salary") / pl.col("years_experience")).alias("salary_per_year")
        )
        .sort("salary", descending=True)
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

### Lazy DataFrames

Use `pl.LazyFrame` when you want to defer computation and let Polars optimize the full query plan before executing. Flyte handles serialization the same way as `pl.DataFrame`:

```
@env.task
async def create_lazyframe() -> pl.LazyFrame:
    """Create a Polars LazyFrame.

    LazyFrames defer computation until collected, allowing Polars to
    optimize the full query plan. They are serialized to Parquet just
    like DataFrames when passed between tasks.
    """
    return pl.LazyFrame(EMPLOYEE_DATA)

@env.task
async def aggregate_by_department(lf: pl.LazyFrame) -> pl.DataFrame:
    """Aggregate salary statistics by department using a LazyFrame.

    The query plan is built lazily and executed only when collect() is called.
    """
    return (
        lf.group_by("department")
        .agg(
            pl.col("salary").mean().alias("avg_salary"),
            pl.col("salary").max().alias("max_salary"),
            pl.len().alias("headcount"),
        )
        .sort("avg_salary", descending=True)
        .collect()
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

The `collect()` call in `aggregate_by_department` is what triggers execution of the lazy plan. The `LazyFrame` passed between tasks is serialized as Parquet at that point.

### Run the example

```
@env.task
async def main():
    df = await create_dataframe()
    filtered = await filter_high_earners(df=df)
    print("High earners:")
    print(filtered)

    lf = await create_lazyframe()
    summary = await aggregate_by_department(lf=lf)
    print("Department summary:")
    print(summary)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/dataframes/polars_dataframes.py*

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/dataframes.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/dataframes/
