# Parallel outputs

After refining the report, the pipeline generates multiple output formats in
parallel. This demonstrates how to use `asyncio.gather` for concurrent execution
within a task.

## The formatting functions

The pipeline generates three outputs: markdown, HTML, and an executive summary.
Only `generate_summary` uses `@flyte.trace` because it makes an LLM call.
The markdown and HTML functions are simple, deterministic transformations that
don't benefit from checkpointing:

```python
async def format_as_markdown(content: str) -> str:
    """Format the report as clean markdown."""
    # Content is already markdown, but we could add TOC, metadata, etc.
    return f"""---
title: Generated Report
date: {__import__('datetime').datetime.now().isoformat()}
---

{content}
"""

async def format_as_html(content: str) -> str:
    """Convert the report to HTML."""
    # Simple markdown to HTML conversion
    import re

    html = content
    # Convert headers
    html = re.sub(r"^### (.+)$", r"<h3>\1</h3>", html, flags=re.MULTILINE)
    html = re.sub(r"^## (.+)$", r"<h2>\1</h2>", html, flags=re.MULTILINE)
    html = re.sub(r"^# (.+)$", r"<h1>\1</h1>", html, flags=re.MULTILINE)
    # Convert bold/italic
    html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
    html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
    # Convert paragraphs
    html = re.sub(r"\n\n", r"</p><p>", html)

    return f"""<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Generated Report</title>
    <style>
        body {{ font-family: system-ui, sans-serif; max-width: 800px; margin: 2rem auto; padding: 0 1rem; }}
        h1, h2, h3 {{ color: #333; }}
        p {{ line-height: 1.6; }}
    </style>
</head>
<body>
<p>{html}</p>
</body>
</html>
"""

@flyte.trace
async def generate_summary(content: str) -> str:
    """Generate an executive summary of the report."""
    return await call_llm(content, SUMMARY_SYSTEM_PROMPT)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### When to trace and when not to

Use `@flyte.trace` for operations that are expensive, non-deterministic, or
call external APIs (like `generate_summary`). Skip it for cheap, deterministic
transformations (like `format_as_markdown` and `format_as_html`) where
re-running on retry is trivial.

## Parallel execution with asyncio.gather

The `format_outputs` task runs all formatters concurrently:

```python
@llm_env.task
async def format_outputs(content: str) -> Dir:
    """
    Generate multiple output formats in parallel.

    Uses asyncio.gather to run all formatting operations concurrently,
    maximizing efficiency when each operation is I/O-bound.

    Args:
        content: The final report content

    Returns:
        Directory containing all formatted outputs
    """
    print("Generating output formats in parallel...")

    with flyte.group("formatting"):
        # Run all formatting operations in parallel
        markdown, html, summary = await asyncio.gather(
            format_as_markdown(content),
            format_as_html(content),
            generate_summary(content),
        )

    # Write outputs to a directory
    output_dir = tempfile.mkdtemp()

    with open(os.path.join(output_dir, "report.md"), "w") as f:
        f.write(markdown)

    with open(os.path.join(output_dir, "report.html"), "w") as f:
        f.write(html)

    with open(os.path.join(output_dir, "summary.txt"), "w") as f:
        f.write(summary)

    print(f"Created outputs in {output_dir}")
    return await Dir.from_local(output_dir)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### How asyncio.gather works

`asyncio.gather` takes multiple coroutines and runs them concurrently:

```python
markdown, html, summary = await asyncio.gather(
    format_as_markdown(content),  # Starts immediately
    format_as_html(content),      # Starts immediately
    generate_summary(content),    # Starts immediately
)
# All three run concurrently, results returned in order
```

Without `gather`, these would run sequentially:

```python
# Sequential (slower)
markdown = await format_as_markdown(content)  # Wait for completion
html = await format_as_html(content)          # Then start this
summary = await generate_summary(content)     # Then start this
```

### When to use asyncio.gather

Use `asyncio.gather` when:
- Operations are independent (don't depend on each other's results)
- Operations are I/O-bound (API calls, file operations)
- You want to minimize total execution time

Don't use `asyncio.gather` when:
- Operations depend on each other
- Operations are CPU-bound (use process pools instead)
- Order of execution matters for side effects

## Grouping parallel operations

The parallel formatting is wrapped in a group for UI clarity:

```python
with flyte.group("formatting"):
    markdown, html, summary = await asyncio.gather(...)
```

In the Flyte UI, the traced call within the group is visible:

```
format_outputs
└── formatting
    ├── format_as_markdown
    ├── format_as_html
    └── generate_summary (traced)
```

## Collecting outputs in a directory

The formatted outputs are written to a temporary directory and returned as a
`Dir` artifact:

```python
output_dir = tempfile.mkdtemp()

with open(os.path.join(output_dir, "report.md"), "w") as f:
    f.write(markdown)

with open(os.path.join(output_dir, "report.html"), "w") as f:
    f.write(html)

with open(os.path.join(output_dir, "summary.txt"), "w") as f:
    f.write(summary)

return await Dir.from_local(output_dir)
```

The `Dir.from_local()` call uploads the directory to Union.ai's
artifact storage, making it available to downstream tasks or applications.

## The batch pipeline

The batch pipeline processes multiple topics in parallel, demonstrating where
`ReusePolicy` truly shines:

```python
@driver_env.task
async def report_batch_pipeline(
    topics: list[str],
    max_iterations: int = 3,
    quality_threshold: int = 8,
) -> list[Dir]:
    """
    Generate reports for multiple topics in parallel.

    This is where ReusePolicy shines: with N topics, each going through
    up to max_iterations refinement cycles, the reusable container pool
    handles potentially N × 7 LLM calls efficiently without cold starts.

    Args:
        topics: List of topics to write about
        max_iterations: Maximum refinement cycles per topic
        quality_threshold: Minimum quality score to accept

    Returns:
        List of directories, each containing a report's formatted outputs
    """
    print(f"Starting batch pipeline for {len(topics)} topics...")

    # Fan out: refine all reports in parallel
    # Each refine_report makes 2-7 LLM calls, all hitting the reusable pool
    with flyte.group("refine_all"):
        reports = await asyncio.gather(*[
            refine_report(topic, max_iterations, quality_threshold)
            for topic in topics
        ])

    print(f"All {len(reports)} reports refined, formatting outputs...")

    # Fan out: format all reports in parallel
    with flyte.group("format_all"):
        outputs = await asyncio.gather(*[
            format_outputs(report)
            for report in reports
        ])

    print(f"Batch pipeline complete! Generated {len(outputs)} reports.")
    return outputs
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### Pipeline flow

1. **Fan out refine_all**: Process all topics in parallel using `asyncio.gather`
2. **Fan out format_all**: Format all reports in parallel
3. **Return list of Dirs**: Each directory contains one report's outputs

With 5 topics, each making ~7 LLM calls, the reusable container pool handles
~35 LLM calls efficiently without cold starts.

## Running the pipeline

To run the batch pipeline:

```python
if __name__ == "__main__":
    flyte.init_from_config()

    # Multiple topics to generate reports for
    topics = [
        "The Impact of Large Language Models on Software Development",
        "Edge Computing: Bringing AI to IoT Devices",
        "Quantum Computing: Current State and Near-Term Applications",
        "The Rise of Rust in Systems Programming",
        "WebAssembly: The Future of Browser-Based Applications",
    ]

    print(f"Submitting batch run for {len(topics)} topics...")
    import sys
    sys.stdout.flush()

    # Run the batch pipeline - this will generate all reports in parallel,
    # with the reusable container pool handling 5 topics × ~7 LLM calls each
    run = flyte.run(
        report_batch_pipeline,
        topics=topics,
        max_iterations=3,
        quality_threshold=8,
    )
    print(f"Batch report generation run URL: {run.url}")
    sys.stdout.flush()
    print("Waiting for pipeline to complete (Ctrl+C to skip)...")
    try:
        run.wait()
        print(f"Pipeline complete! Outputs: {run.outputs()}")
    except KeyboardInterrupt:
        print(f"\nSkipped waiting. Check status at: {run.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

```bash
uv run generate.py
```

The pipeline will:
1. Process all topics in parallel (each with iterative refinement)
2. Format all reports in parallel
3. Return a list of directories, each containing a report's outputs

## Cost optimization tips

### 1. Choose the right model

The example uses `gpt-4o-mini` for cost efficiency. For higher quality (at higher
cost), you could use `gpt-4o` or `gpt-4-turbo`:

```python
response = await client.chat.completions.create(
    model="gpt-4o",  # More capable, more expensive
    ...
)
```

### 2. Tune iteration parameters

Fewer iterations mean fewer API calls:

```python
run = flyte.run(
    report_batch_pipeline,
    topics=["Topic A", "Topic B"],
    max_iterations=2,      # Limit iterations
    quality_threshold=7,   # Accept slightly lower quality
)
```

### 3. Use caching effectively

The `cache="auto"` setting on the environment caches task outputs. Running the
same pipeline with the same inputs returns cached results instantly:

```python
llm_env = flyte.TaskEnvironment(
    ...
    cache="auto",  # Cache task outputs
)
```

### 4. Scale the batch

The batch pipeline already processes topics in parallel. To handle larger batches,
adjust the `ReusePolicy`:

```python
reusable=flyte.ReusePolicy(
    replicas=4,           # More containers for larger batches
    concurrency=4,        # Tasks per container
    ...
)
```

With 4 replicas × 4 concurrency = 16 slots, you can process 16 topics' refinement
tasks concurrently.

## Next steps

Learn how to [deploy a serving app](https://www.union.ai/docs/v2/union/user-guide/advanced-project/parallel-outputs/serving-app) that connects to the pipeline
outputs and provides an interactive UI for report generation.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/advanced-project/parallel-outputs.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/advanced-project/parallel-outputs/
