# Files and directories

Flyte provides the [`flyte.io.File`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.io/file) and
[`flyte.io.Dir`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.io/dir) types to represent files and directories, respectively.
Together with [`flyte.io.DataFrame`](https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories/dataframes) they constitute the *offloaded data types* - unlike [materialized types](https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories/dataclasses-and-structures) like data classes, these pass references rather than full data content.

A variable of an offloaded type does not contain its actual data, but rather a reference to the data.
The actual data is stored in the internal blob store of your Union/Flyte instance.
When a variable of an offloaded type is first created, its data is uploaded to the blob store.
It can then be passed from task to task as a reference.
The actual data is only downloaded from the blob stored when the task needs to access it, for example, when the task calls `open()` on a `File` or `Dir` object.

This allows Flyte to efficiently handle large files and directories without needing to transfer the data unnecessarily.
Even very large data objects like video files and DNA datasets can be passed efficiently between tasks.

The `File` and `Dir` classes provide both `sync` and `async` methods to interact with the data.

## Example usage

The examples below show the basic use-cases of uploading files and directories created locally, and using them as inputs to a task.

```
import asyncio
import tempfile
from pathlib import Path

import flyte
from flyte.io import Dir, File

env = flyte.TaskEnvironment(name="files-and-folders")

@env.task
async def write_file(name: str) -> File:

    # Create a file and write some content to it
    with open("test.txt", "w") as f:
        f.write(f"hello world {name}")

    # Upload the file using flyte
    uploaded_file_obj = await File.from_local("test.txt")
    return uploaded_file_obj
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

The upload happens when the [`File.from_local`](https://www.union.ai/docs/v2/union/user-guide/api-reference/flyte-sdk/packages/flyte.io/file#from_local) command is called.
Because the upload would otherwise block execution, `File.from_local` is implemented as an `async` function.
The Flyte SDK frequently uses this class constructor pattern, so you will see it with other types as well.

This is a slightly more complicated task that calls the task above to produce `File` objects.

These are assembled into a directory and the `Dir` object is returned, also via invoking `from_local`.

```
@env.task
async def write_and_check_files() -> Dir:
    coros = []
    for name in ["Alice", "Bob", "Eve"]:
        coros.append(write_file(name=name))

    vals = await asyncio.gather(*coros)
    temp_dir = tempfile.mkdtemp()
    for file in vals:
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"File {file.path} contents: {contents_str}")
            new_file = Path(temp_dir) / file.name
            with open(new_file, "w") as out:  # noqa: ASYNC230
                out.write(contents_str)
    print(f"Files written to {temp_dir}")

    # walk the directory and ls
    for path in Path(temp_dir).iterdir():
        print(f"File: {path.name}")

    my_dir = await Dir.from_local(temp_dir)
    return my_dir
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

Finally, these tasks show how to use an offloaded type as an input.
Helper functions like `walk` and `open` have been added to the objects
and do what you might expect.

```
@env.task
async def check_dir(my_dir: Dir):
    print(f"Dir {my_dir.path} contents:")
    async for file in my_dir.walk():
        print(f"File: {file.name}")
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"Contents: {contents_str}")

@env.task
async def create_and_check_dir():
    my_dir = await write_and_check_files()
    await check_dir(my_dir=my_dir)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(create_and_check_dir)
    print(r.name)
    print(r.url)
    r.wait()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/file_and_dir.py*

## JSONL files

The `flyteplugins-jsonl` package extends `File` and `Dir` with JSONL-aware types: `JsonlFile` and `JsonlDir`. They add streaming record-level read and write on top of the standard file/directory capabilities, with optional [zstd](https://github.com/facebook/zstd) compression and automatic shard rotation for large datasets.

Records are serialized with [orjson](https://github.com/ijl/orjson) for high performance. Both types provide async and sync APIs where every read/write method has a `_sync` variant (e.g. `iter_records_sync()`, `writer_sync()`).

```bash
pip install flyteplugins-jsonl
```

### Setup

```
import flyte
from flyteplugins.jsonl import JsonlDir, JsonlFile

env = flyte.TaskEnvironment(
    name="jsonl-examples",
    image=flyte.Image.from_debian_base(name="jsonl").with_pip_packages(
        "flyteplugins-jsonl"
    ),
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### JsonlFile

`JsonlFile` is a `File` subclass for single JSONL files. Use its async context manager to write records incrementally without loading the entire dataset into memory:

```
@env.task
async def write_records() -> JsonlFile:
    """Write records to a single JSONL file."""
    out = JsonlFile.new_remote("results.jsonl")
    async with out.writer() as writer:
        for i in range(500_000):
            await writer.write({"id": i, "score": i * 0.1})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading is equally streaming:

```
@env.task
async def read_records(data: JsonlFile) -> int:
    """Read records from a JsonlFile and return the count."""
    count = 0
    async for record in data.iter_records():
        count += 1
    return count
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### Compression

Both `JsonlFile` and `JsonlDir` support zstd compression transparently based on the file extension. Use `.jsonl.zst` (or `.jsonl.zstd`) to enable compression:

```
@env.task
async def write_compressed() -> JsonlFile:
    """Write a zstd-compressed JSONL file.

    Compression is activated by using a .jsonl.zst extension.
    Both reading and writing handle compression transparently.
    """
    out = JsonlFile.new_remote("results.jsonl.zst")
    async with out.writer(compression_level=3) as writer:
        for i in range(100_000):
            await writer.write({"id": i, "compressed": True})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading compressed files requires no code changes; the compression format is detected automatically from the extension.

### JsonlDir

`JsonlDir` is a `Dir` subclass that shards records across multiple JSONL files (named `part-00000.jsonl`, `part-00001.jsonl`, etc.). When a shard reaches the record count or byte size threshold, a new shard is opened automatically. This keeps individual files at a manageable size even for very large datasets:

```
@env.task
async def write_large_dataset() -> JsonlDir:
    """Write a large dataset to a sharded JsonlDir.

    JsonlDir automatically rotates to a new shard file once the
    current shard reaches the record or byte limit. Shards are named
    part-00000.jsonl, part-00001.jsonl, etc.
    """
    out = JsonlDir.new_remote("dataset/")
    async with out.writer(
        max_records_per_shard=100_000,
        max_bytes_per_shard=256 * 1024 * 1024,  # 256 MB
    ) as writer:
        for i in range(500_000):
            await writer.write({"index": i, "value": i * i})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Compressed shards are also supported by specifying the `shard_extension`:

```
@env.task
async def write_compressed_dir() -> JsonlDir:
    """Write zstd-compressed shards by specifying the shard extension."""
    out = JsonlDir.new_remote("compressed_dataset/")
    async with out.writer(
        shard_extension=".jsonl.zst",
        max_records_per_shard=50_000,
    ) as writer:
        for i in range(200_000):
            await writer.write({"id": i, "data": f"payload-{i}"})
    return out
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

Reading iterates across all shards transparently. The next shard is prefetched in the background to overlap network I/O with processing:

```
@env.task
async def sum_values(dataset: JsonlDir) -> int:
    """Read all records across all shards and compute a sum.

    Iteration is transparent across shards and handles mixed
    compressed/uncompressed shards automatically. The next shard is
    prefetched in the background for higher throughput.
    """
    total = 0
    async for record in dataset.iter_records():
        total += record["value"]
    return total
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

If you open a writer on a directory that already contains shards, the writer detects existing shard indices and continues from the next one, making it safe to append data to an existing `JsonlDir`.

### Error handling

All read methods accept an `on_error` parameter to control how corrupt or malformed lines are handled:

- `"raise"` (default): propagate parse errors immediately
- `"skip"`: log a warning and skip corrupt lines
- A callable `(line_number, raw_line, exception) -> None` for custom handling

```
@env.task
async def read_with_error_handling(data: JsonlFile) -> int:
    """Read records, skipping any corrupt lines instead of raising."""
    count = 0
    async for record in data.iter_records(on_error="skip"):
        count += 1
    return count

@env.task
async def read_with_custom_handler(data: JsonlFile) -> int:
    """Read records with a custom error handler that collects errors."""
    errors: list[dict] = []

    def on_error(line_number: int, raw_line: bytes, exc: Exception) -> None:
        errors.append({"line": line_number, "error": str(exc)})

    count = 0
    async for record in data.iter_records(on_error=on_error):
        count += 1
    print(f"{count} valid records, {len(errors)} errors")
    return count
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

### Batch iteration

For bulk processing, both `JsonlFile` and `JsonlDir` support batched iteration. `iter_batches()` yields lists of dicts:

```
@env.task
async def process_in_batches(dataset: JsonlDir) -> int:
    """Process records in batches of dicts for bulk operations."""
    total = 0
    async for batch in dataset.iter_batches(batch_size=1000):
        # Each batch is a list[dict]
        total += len(batch)
    return total
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

For analytics workloads, `iter_arrow_batches()` yields Arrow `RecordBatch` objects directly. This requires the optional `pyarrow` dependency:

```bash
pip install 'flyteplugins-jsonl[arrow]'
```

```
arrow_env = flyte.TaskEnvironment(
    name="jsonl-arrow",
    image=flyte.Image.from_debian_base(name="jsonl-arrow").with_pip_packages(
        "flyteplugins-jsonl[arrow]"
    ),
)

@arrow_env.task
async def analyze_with_arrow(dataset: JsonlDir) -> float:
    """Stream records as Arrow RecordBatches for analytics.

    Memory usage is bounded by batch_size — the full dataset is
    never loaded into memory at once.
    """
    import pyarrow as pa

    batches = []
    async for batch in dataset.iter_arrow_batches(batch_size=65_536):
        batches.append(batch)

    table = pa.Table.from_batches(batches)
    mean_value = table.column("value").to_pylist()
    return sum(mean_value) / len(mean_value)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/files-and-directories/jsonl.py*

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/files-and-directories.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories/
