# Advanced project
> This bundle contains all pages in the Advanced project section.
> Source: https://www.union.ai/docs/v2/union/user-guide/advanced-project/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/advanced-project ===

# Advanced project: LLM reporting agent

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This example demonstrates a resilient agentic report generator that showcases
Flyte 2.0's advanced features for building production-grade AI workflows.

## What you'll build

A batch report generator that:
1. Processes multiple topics in parallel
2. Iteratively critiques and refines each report until it meets a quality threshold
3. Produces multiple output formats (Markdown, HTML, summary) for each report
4. Serves results through an interactive UI

## Concepts covered

| Feature | Description |
|---------|-------------|
| `ReusePolicy` | Keep containers warm for high-throughput batch processing |
| `@flyte.trace` | Checkpoint LLM calls for recovery and observability |
| `RetryStrategy` | Handle transient API failures gracefully |
| `flyte.group` | Organize parallel batches and iterations in the UI |
| `asyncio.gather` | Fan out to process multiple topics concurrently |
| Pydantic models | Structured LLM outputs |
| `AppEnvironment` | Deploy interactive Streamlit apps |
| `RunOutput` | Connect apps to pipeline outputs |

## Architecture

```mermaid
flowchart TD
    A[Topics List] --> B

    B["report_batch_pipeline<br/><i>driver_env</i>"]

    subgraph B1 ["refine_all (parallel)"]
        direction LR
        R1["refine_report<br/>topic 1"]
        R2["refine_report<br/>topic 2"]
        R3["refine_report<br/>topic N"]
    end
    B --> B1

    subgraph B2 ["format_all (parallel)"]
        direction LR
        F1["format_outputs<br/>report 1"]
        F2["format_outputs<br/>report 2"]
        F3["format_outputs<br/>report N"]
    end
    B1 --> B2

    B2 --> C["Output: List of Dirs"]
```

Each `refine_report` task runs in a reusable container (`llm_env`) and performs
multiple LLM calls through traced functions:

```mermaid
flowchart TD
    A[Topic] --> B["generate_initial_draft<br/><i>@flyte.trace</i>"]
    B --> C

    subgraph C ["refinement_loop"]
        direction TB
        D["critique_content<br/><i>@flyte.trace</i>"] -->|score >= threshold| E[exit loop]
        D -->|score < threshold| F["revise_content<br/><i>@flyte.trace</i>"]
        F --> D
    end
    C --> G[Refined Report]
```

## Prerequisites

- A Union.ai account with an active project
- An OpenAI API key stored as a secret named `openai-api-key`

To create the secret:

```bash
flyte secret create openai-api-key
```

## Parts

1. ****Advanced project: LLM reporting agent > Resilient generation****: Set up reusable environments, traced LLM calls, and retry strategies
2. ****Advanced project: LLM reporting agent > Agentic refinement****: Build the iterative critique-and-revise loop
3. ****Advanced project: LLM reporting agent > Parallel outputs****: Generate multiple formats concurrently
4. ****Advanced project: LLM reporting agent > Serving app****: Deploy an interactive UI for report generation

[Resilient generation]()

## Key takeaways

1. **Reusable environments for batch processing**: `ReusePolicy` keeps containers warm,
   enabling efficient processing of multiple topics without cold start overhead. With
   5 topics × ~7 LLM calls each, the reusable pool handles ~35 calls efficiently.

2. **Checkpointed LLM calls**: `@flyte.trace` provides automatic checkpointing at the
   function level, enabling recovery without re-running expensive API calls.

3. **Agentic patterns**: The generate-critique-revise loop demonstrates how to build
   self-improving AI workflows with clear observability through `flyte.group`.

4. **Parallel fan-out**: `asyncio.gather` processes multiple topics concurrently,
   maximizing throughput by running refinement tasks in parallel across the batch.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/advanced-project/resilient-generation ===

# Resilient generation

This section covers the foundational patterns for building resilient LLM-powered
tasks: reusable environments, traced function calls, and retry strategies.

## Two environments

This example uses two task environments with different characteristics:

1. **`llm_env`** (reusable): For tasks that make many LLM calls in a loop or
   process batches in parallel. Container reuse avoids cold starts.
2. **`driver_env`** (standard): For orchestration tasks that fan out work to
   other tasks but don't make LLM calls themselves.

### Reusable environment for LLM work

When processing a batch of topics, each topic goes through multiple LLM calls
(generate, critique, revise, repeat). With 5 topics × ~7 calls each, that's ~35
LLM calls. `ReusePolicy` keeps containers warm to handle this efficiently:

```python
# Reusable environment for tasks that make many LLM calls in a loop.
# The ReusePolicy keeps containers warm, reducing cold start latency for iterative work.
llm_env = flyte.TaskEnvironment(
    name="llm-worker",
    secrets=[] if MOCK_MODE else [flyte.Secret(key="openai-api-key", as_env_var="OPENAI_API_KEY")],
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "unionai-reuse>=0.1.10",
        "openai>=1.0.0",
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    reusable=flyte.ReusePolicy(
        replicas=2,              # Keep 2 container instances ready
        concurrency=4,           # Allow 4 concurrent tasks per container
        scaledown_ttl=timedelta(minutes=5),   # Wait 5 min before scaling down
        idle_ttl=timedelta(minutes=30),       # Shut down after 30 min idle
    ),
    cache="auto",
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### ReusePolicy parameters

| Parameter | Description |
|-----------|-------------|
| `replicas` | Number of container instances to keep ready (or `(min, max)` tuple) |
| `concurrency` | Maximum tasks per container at once |
| `scaledown_ttl` | Minimum wait before scaling down a replica |
| `idle_ttl` | Time after which idle containers shut down completely |

The configuration above keeps 2 containers ready, allows 4 concurrent tasks per
container, waits 5 minutes before scaling down, and shuts down after 30 minutes
of inactivity.

> **📝 Note**
>
> Both `scaledown_ttl` and `idle_ttl` must be at least 30 seconds.

### Standard environment for orchestration

The driver environment doesn't need container reuse—it just coordinates work.
The `depends_on` parameter declares that tasks in this environment call tasks
in `llm_env`, ensuring both environments are deployed together:

```python
# Standard environment for orchestration tasks that don't need container reuse.
# depends_on declares that this environment's tasks call tasks in llm_env.
driver_env = flyte.TaskEnvironment(
    name="driver",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    depends_on=[llm_env],
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

## Traced LLM calls

The `@flyte.trace` decorator provides automatic checkpointing at the function level.
When a traced function completes successfully, its result is cached. If the task
fails and restarts, previously completed traced calls return their cached results
instead of re-executing.

```python
@flyte.trace
async def call_llm(prompt: str, system: str, json_mode: bool = False) -> str:
    """
    Make an LLM call with automatic checkpointing.

    The @flyte.trace decorator provides:
    - Automatic caching of results for identical inputs
    - Recovery from failures without re-running successful calls
    - Full observability in the Flyte UI

    Args:
        prompt: The user prompt to send
        system: The system prompt defining the LLM's role
        json_mode: Whether to request JSON output

    Returns:
        The LLM's response text
    """
    # Use mock responses for testing without API keys
    if MOCK_MODE:
        import asyncio
        await asyncio.sleep(0.5)  # Simulate API latency

        if "critique" in prompt.lower() or "critic" in system.lower():
            # Return good score if draft has been revised (contains revision marker)
            if "[REVISED]" in prompt:
                return MOCK_CRITIQUE_GOOD
            return MOCK_CRITIQUE_NEEDS_WORK
        elif "summary" in system.lower():
            return MOCK_SUMMARY
        elif "revis" in system.lower():
            # Return revised version with marker
            return MOCK_REPORT.replace("## Introduction", "[REVISED]\n\n## Introduction")
        else:
            return MOCK_REPORT

    from openai import AsyncOpenAI

    client = AsyncOpenAI()

    kwargs = {
        "model": "gpt-4o-mini",
        "messages": [
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ],
        "max_tokens": 2000,
    }

    if json_mode:
        kwargs["response_format"] = {"type": "json_object"}

    response = await client.chat.completions.create(**kwargs)
    return response.choices[0].message.content
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### Benefits of tracing

1. **Cost savings**: Failed tasks don't re-run expensive API calls that already succeeded
2. **Faster recovery**: Resuming from checkpoints skips completed work
3. **Observability**: Each traced call appears in the Flyte UI with timing data

### When to use @flyte.trace

Use `@flyte.trace` for:
- LLM API calls (OpenAI, Anthropic, etc.)
- External API requests
- Any expensive operation you don't want to repeat on retry

Don't use `@flyte.trace` for:
- Simple computations (overhead outweighs benefit)
- Operations with side effects that shouldn't be skipped

## Traced helper functions

The LLM-calling functions are decorated with `@flyte.trace` rather than being
separate tasks. This keeps the architecture simple while still providing
checkpointing:

```python
@flyte.trace
async def generate_initial_draft(topic: str) -> str:
    """
    Generate the initial report draft.

    The @flyte.trace decorator provides checkpointing - if the task fails
    after this completes, it won't re-run on retry.

    Args:
        topic: The topic to write about

    Returns:
        The initial draft in markdown format
    """
    print(f"Generating initial draft for topic: {topic}")

    prompt = f"Write a comprehensive report on the following topic:\n\n{topic}"
    draft = await call_llm(prompt, GENERATOR_SYSTEM_PROMPT)

    print(f"Generated initial draft ({len(draft)} characters)")
    return draft
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

These traced functions run inside the `refine_report` task. If the task fails
and retries, completed traced calls return cached results instead of re-executing.

## Retry strategies

The task that orchestrates the LLM calls uses `retries` to handle transient failures:

```python
@llm_env.task(retries=3)
async def refine_report(topic: str, ...) -> str:
    # Traced functions are called here
    draft = await generate_initial_draft(topic)
    ...
```

### Configuring retries

You can specify retries as a simple integer:

```python
@llm_env.task(retries=3)
async def my_task():
    ...
```

Or use `RetryStrategy` for more control:

```python
@llm_env.task(retries=flyte.RetryStrategy(count=3))
async def my_task():
    ...
```

### Combining tracing with retries

When you combine `@flyte.trace` with task-level retries, you get the best of both:

1. Task fails after completing some traced calls
2. Flyte retries the task
3. Previously completed traced calls return cached results
4. Only the failed operation (and subsequent ones) re-execute

This pattern is essential for multi-step LLM workflows where you don't want to
re-run the entire chain when a single call fails.

## Structured prompts

The example uses a separate `prompts.py` module for system prompts and Pydantic models:

```python
GENERATOR_SYSTEM_PROMPT = """You are an expert report writer. Generate a well-structured,
informative report on the given topic. The report should include:

1. An engaging introduction that sets context
2. Clear sections with descriptive headings
3. Specific facts, examples, or data points where relevant
4. A conclusion that summarizes key takeaways

Write in a professional but accessible tone. Use markdown formatting for structure.
Aim for approximately 500-800 words."""

CRITIC_SYSTEM_PROMPT = """You are a demanding but fair editor reviewing a report draft.
Evaluate the report on these criteria:

- Clarity: Is the writing clear and easy to follow?
- Structure: Is it well-organized with logical flow?
- Depth: Does it provide sufficient detail and insight?
- Accuracy: Are claims supported and reasonable?
- Engagement: Is it interesting to read?

Provide your response as JSON matching this schema:
{
    "score": <1-10 integer>,
    "strengths": ["strength 1", "strength 2", ...],
    "improvements": ["improvement 1", "improvement 2", ...],
    "summary": "brief overall assessment"
}

Be specific in your feedback. A score of 8+ means the report is ready for publication."""

REVISER_SYSTEM_PROMPT = """You are an expert editor revising a report based on feedback.
Your task is to improve the report by addressing the specific improvements requested
while preserving its strengths.

Guidelines:
- Address each improvement point specifically
- Maintain the original voice and style
- Keep the same overall structure unless restructuring is requested
- Preserve any content that was praised as a strength
- Ensure the revised version is cohesive and flows well

Return only the revised report in markdown format, no preamble or explanation."""

SUMMARY_SYSTEM_PROMPT = """Create a concise executive summary (2-3 paragraphs) of the
following report. Capture the key points and main takeaways. Write in a professional
tone suitable for busy executives who need the essential information quickly."""
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/prompts.py*

### Pydantic models for structured output

LLM responses can be unpredictable. Using Pydantic models with JSON mode ensures
you get structured, validated data:

```python
class Critique(BaseModel):
    """Structured critique response from the LLM."""

    score: int = Field(
        ge=1,
        le=10,
        description="Quality score from 1-10, where 10 is publication-ready",
    )
    strengths: list[str] = Field(
        description="List of strengths in the current draft",
    )
    improvements: list[str] = Field(
        description="Specific improvements needed",
    )
    summary: str = Field(
        description="Brief summary of the critique",
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/prompts.py*

The `Critique` model validates that:
- `score` is an integer between 1 and 10
- `strengths` and `improvements` are lists of strings
- All required fields are present

If the LLM returns malformed JSON, Pydantic raises a validation error, which
triggers a retry (if configured).

## Next steps

With resilient generation in place, you're ready to build the
[agentic refinement loop](./agentic-refinement).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/advanced-project/agentic-refinement ===

# Agentic refinement

The core of this example is an agentic refinement loop: generate content, critique
it, revise based on feedback, and repeat until quality meets a threshold. This
pattern is fundamental to building self-improving AI systems.

## The agentic pattern

Traditional pipelines are linear: input → process → output. Agentic workflows
are iterative: they evaluate their own output and improve it through multiple
cycles.

```mermaid
flowchart TD
    A[Generate] --> B[Critique]
    B -->|score >= threshold| C[Done]
    B -->|score < threshold| D[Revise]
    D --> B
```

## Critique function

The critique function evaluates the current draft and returns structured feedback.
It's a traced function (not a separate task) that runs inside `refine_report`:

```python
@flyte.trace
async def critique_content(draft: str) -> Critique:
    """
    Critique the current draft and return structured feedback.

    Uses Pydantic models to parse the LLM's JSON response into
    a typed object for reliable downstream processing.

    Args:
        draft: The current draft to critique

    Returns:
        Structured critique with score, strengths, and improvements
    """
    print("Critiquing current draft...")

    response = await call_llm(
        f"Please critique the following report:\n\n{draft}",
        CRITIC_SYSTEM_PROMPT,
        json_mode=True,
    )

    # Parse the JSON response into our Pydantic model
    critique_data = json.loads(response)
    critique = Critique(**critique_data)

    print(f"Critique score: {critique.score}/10")
    print(f"Strengths: {len(critique.strengths)}, Improvements: {len(critique.improvements)}")

    return critique
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

Key points:
- Uses `json_mode=True` to ensure the LLM returns valid JSON
- Parses the response into a Pydantic `Critique` model
- Returns a typed object for reliable downstream processing
- `@flyte.trace` provides checkpointing—if the task retries, completed critiques aren't re-run

## Revise function

The revise function takes the current draft and specific improvements to address:

```python
@flyte.trace
async def revise_content(draft: str, improvements: list[str]) -> str:
    """
    Revise the draft based on critique feedback.

    Args:
        draft: The current draft to revise
        improvements: List of specific improvements to address

    Returns:
        The revised draft
    """
    print(f"Revising draft to address {len(improvements)} improvements...")

    improvements_text = "\n".join(f"- {imp}" for imp in improvements)
    prompt = f"""Please revise the following report to address these improvements:

IMPROVEMENTS NEEDED:
{improvements_text}

CURRENT DRAFT:
{draft}"""

    revised = await call_llm(prompt, REVISER_SYSTEM_PROMPT)

    print(f"Revision complete ({len(revised)} characters)")
    return revised
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

The prompt includes:
1. The list of improvements from the critique
2. The current draft to revise

This focused approach helps the LLM make targeted changes rather than rewriting
from scratch.

## The refinement loop

The `refine_report` task orchestrates the iterative refinement. It runs in the
reusable `llm_env` because it makes multiple LLM calls through traced functions:

```python
@llm_env.task(retries=3)
async def refine_report(
    topic: str,
    max_iterations: int = 3,
    quality_threshold: int = 8,
) -> str:
    """
    Iteratively refine a report until it meets the quality threshold.

    This task runs in a reusable container because it makes multiple LLM calls
    in a loop. The traced helper functions provide checkpointing, so if the
    task fails mid-loop, completed LLM calls won't be re-run on retry.

    Args:
        topic: The topic to write about
        max_iterations: Maximum refinement cycles (default: 3)
        quality_threshold: Minimum score to accept (default: 8)

    Returns:
        The final refined report
    """
    # Generate initial draft
    draft = await generate_initial_draft(topic)

    # Iterative refinement loop
    for i in range(max_iterations):
        with flyte.group(f"refinement_{i + 1}"):
            # Get critique
            critique = await critique_content(draft)

            # Check if we've met the quality threshold
            if critique.score >= quality_threshold:
                print(f"Quality threshold met at iteration {i + 1}!")
                print(f"Final score: {critique.score}/10")
                break

            # Revise based on feedback
            print(f"Score {critique.score} < {quality_threshold}, revising...")
            draft = await revise_content(draft, critique.improvements)
    else:
        print(f"Reached max iterations ({max_iterations})")

    return draft
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### How it works

1. **Generate initial draft**: Creates the first version of the report
2. **Enter refinement loop**: Iterates up to `max_iterations` times
3. **Critique**: Evaluates the current draft and assigns a score
4. **Check threshold**: If score meets `quality_threshold`, exit early
5. **Revise**: If below threshold, revise based on improvements
6. **Repeat**: Continue until threshold met or iterations exhausted

All the LLM calls (generate, critique, revise) are traced functions inside this
single task. This keeps the task graph simple while the reusable container handles
the actual LLM work efficiently.

### Early exit

The `if critique.score >= quality_threshold: break` pattern enables early exit
when quality is sufficient. This saves compute costs and time—no need to run
all iterations if the first draft is already good.

## Grouping iterations with flyte.group

Each refinement iteration is wrapped in `flyte.group`:

```python
for i in range(max_iterations):
    with flyte.group(f"refinement_{i + 1}"):
        critique = await critique_content(draft)
        # ...
```

### Why use flyte.group?

Groups provide hierarchical organization in the Flyte UI. Since critique and
revise are traced functions (not separate tasks), groups help organize them:

```
refine_report
├── generate_initial_draft (traced)
├── refinement_1
│   ├── critique_content (traced)
│   └── revise_content (traced)
├── refinement_2
│   ├── critique_content (traced)
│   └── revise_content (traced)
└── [returns refined report]
```

Benefits:
- **Clarity**: See exactly how many iterations occurred
- **Debugging**: Quickly find which iteration had issues
- **Observability**: Track time spent in each refinement cycle

### Group context

Groups are implemented as context managers. All traced calls and nested groups
within the `with flyte.group(...)` block are associated with that group.

## Configuring the loop

The refinement loop accepts parameters to tune its behavior:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `max_iterations` | 3 | Upper bound on refinement cycles |
| `quality_threshold` | 8 | Minimum score (1-10) to accept |

### Choosing thresholds

- **Higher threshold** (9-10): More refinement cycles, higher quality, more API costs
- **Lower threshold** (6-7): Faster completion, may accept lower quality
- **More iterations**: Safety net for difficult topics
- **Fewer iterations**: Cost control, faster turnaround

A good starting point is `quality_threshold=8` with `max_iterations=3`. Adjust
based on your quality requirements and budget.

## Best practices for agentic loops

1. **Always set max iterations**: Prevent infinite loops if the quality threshold
   is never reached.

2. **Use structured critiques**: Pydantic models ensure you can reliably extract
   the score and improvements from LLM responses.

3. **Log iteration progress**: Print statements help debug when reviewing logs:
   ```python
   print(f"Iteration {i + 1}: score={critique.score}")
   ```

4. **Consider diminishing returns**: After 3-4 iterations, improvements often
   become marginal. Set `max_iterations` accordingly.

5. **Use groups for observability**: `flyte.group` makes the iterative nature
   visible in the UI, essential for debugging and monitoring.

## Next steps

With the agentic refinement loop complete, learn how to
[generate multiple outputs in parallel](./parallel-outputs).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/advanced-project/parallel-outputs ===

# Parallel outputs

After refining the report, the pipeline generates multiple output formats in
parallel. This demonstrates how to use `asyncio.gather` for concurrent execution
within a task.

## The formatting functions

The pipeline generates three outputs: markdown, HTML, and an executive summary.
Only `generate_summary` uses `@flyte.trace` because it makes an LLM call.
The markdown and HTML functions are simple, deterministic transformations that
don't benefit from checkpointing:

```python
async def format_as_markdown(content: str) -> str:
    """Format the report as clean markdown."""
    # Content is already markdown, but we could add TOC, metadata, etc.
    return f"""---
title: Generated Report
date: {__import__('datetime').datetime.now().isoformat()}
---

{content}
"""

async def format_as_html(content: str) -> str:
    """Convert the report to HTML."""
    # Simple markdown to HTML conversion
    import re

    html = content
    # Convert headers
    html = re.sub(r"^### (.+)$", r"<h3>\1</h3>", html, flags=re.MULTILINE)
    html = re.sub(r"^## (.+)$", r"<h2>\1</h2>", html, flags=re.MULTILINE)
    html = re.sub(r"^# (.+)$", r"<h1>\1</h1>", html, flags=re.MULTILINE)
    # Convert bold/italic
    html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
    html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
    # Convert paragraphs
    html = re.sub(r"\n\n", r"</p><p>", html)

    return f"""<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Generated Report</title>
    <style>
        body {{ font-family: system-ui, sans-serif; max-width: 800px; margin: 2rem auto; padding: 0 1rem; }}
        h1, h2, h3 {{ color: #333; }}
        p {{ line-height: 1.6; }}
    </style>
</head>
<body>
<p>{html}</p>
</body>
</html>
"""

@flyte.trace
async def generate_summary(content: str) -> str:
    """Generate an executive summary of the report."""
    return await call_llm(content, SUMMARY_SYSTEM_PROMPT)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### When to trace and when not to

Use `@flyte.trace` for operations that are expensive, non-deterministic, or
call external APIs (like `generate_summary`). Skip it for cheap, deterministic
transformations (like `format_as_markdown` and `format_as_html`) where
re-running on retry is trivial.

## Parallel execution with asyncio.gather

The `format_outputs` task runs all formatters concurrently:

```python
@llm_env.task
async def format_outputs(content: str) -> Dir:
    """
    Generate multiple output formats in parallel.

    Uses asyncio.gather to run all formatting operations concurrently,
    maximizing efficiency when each operation is I/O-bound.

    Args:
        content: The final report content

    Returns:
        Directory containing all formatted outputs
    """
    print("Generating output formats in parallel...")

    with flyte.group("formatting"):
        # Run all formatting operations in parallel
        markdown, html, summary = await asyncio.gather(
            format_as_markdown(content),
            format_as_html(content),
            generate_summary(content),
        )

    # Write outputs to a directory
    output_dir = tempfile.mkdtemp()

    with open(os.path.join(output_dir, "report.md"), "w") as f:
        f.write(markdown)

    with open(os.path.join(output_dir, "report.html"), "w") as f:
        f.write(html)

    with open(os.path.join(output_dir, "summary.txt"), "w") as f:
        f.write(summary)

    print(f"Created outputs in {output_dir}")
    return await Dir.from_local(output_dir)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### How asyncio.gather works

`asyncio.gather` takes multiple coroutines and runs them concurrently:

```python
markdown, html, summary = await asyncio.gather(
    format_as_markdown(content),  # Starts immediately
    format_as_html(content),      # Starts immediately
    generate_summary(content),    # Starts immediately
)
# All three run concurrently, results returned in order
```

Without `gather`, these would run sequentially:

```python
# Sequential (slower)
markdown = await format_as_markdown(content)  # Wait for completion
html = await format_as_html(content)          # Then start this
summary = await generate_summary(content)     # Then start this
```

### When to use asyncio.gather

Use `asyncio.gather` when:
- Operations are independent (don't depend on each other's results)
- Operations are I/O-bound (API calls, file operations)
- You want to minimize total execution time

Don't use `asyncio.gather` when:
- Operations depend on each other
- Operations are CPU-bound (use process pools instead)
- Order of execution matters for side effects

## Grouping parallel operations

The parallel formatting is wrapped in a group for UI clarity:

```python
with flyte.group("formatting"):
    markdown, html, summary = await asyncio.gather(...)
```

In the Flyte UI, the traced call within the group is visible:

```
format_outputs
└── formatting
    ├── format_as_markdown
    ├── format_as_html
    └── generate_summary (traced)
```

## Collecting outputs in a directory

The formatted outputs are written to a temporary directory and returned as a
`Dir` artifact:

```python
output_dir = tempfile.mkdtemp()

with open(os.path.join(output_dir, "report.md"), "w") as f:
    f.write(markdown)

with open(os.path.join(output_dir, "report.html"), "w") as f:
    f.write(html)

with open(os.path.join(output_dir, "summary.txt"), "w") as f:
    f.write(summary)

return await Dir.from_local(output_dir)
```

The `Dir.from_local()` call uploads the directory to Union.ai's
artifact storage, making it available to downstream tasks or applications.

## The batch pipeline

The batch pipeline processes multiple topics in parallel, demonstrating where
`ReusePolicy` truly shines:

```python
@driver_env.task
async def report_batch_pipeline(
    topics: list[str],
    max_iterations: int = 3,
    quality_threshold: int = 8,
) -> list[Dir]:
    """
    Generate reports for multiple topics in parallel.

    This is where ReusePolicy shines: with N topics, each going through
    up to max_iterations refinement cycles, the reusable container pool
    handles potentially N × 7 LLM calls efficiently without cold starts.

    Args:
        topics: List of topics to write about
        max_iterations: Maximum refinement cycles per topic
        quality_threshold: Minimum quality score to accept

    Returns:
        List of directories, each containing a report's formatted outputs
    """
    print(f"Starting batch pipeline for {len(topics)} topics...")

    # Fan out: refine all reports in parallel
    # Each refine_report makes 2-7 LLM calls, all hitting the reusable pool
    with flyte.group("refine_all"):
        reports = await asyncio.gather(*[
            refine_report(topic, max_iterations, quality_threshold)
            for topic in topics
        ])

    print(f"All {len(reports)} reports refined, formatting outputs...")

    # Fan out: format all reports in parallel
    with flyte.group("format_all"):
        outputs = await asyncio.gather(*[
            format_outputs(report)
            for report in reports
        ])

    print(f"Batch pipeline complete! Generated {len(outputs)} reports.")
    return outputs
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### Pipeline flow

1. **Fan out refine_all**: Process all topics in parallel using `asyncio.gather`
2. **Fan out format_all**: Format all reports in parallel
3. **Return list of Dirs**: Each directory contains one report's outputs

With 5 topics, each making ~7 LLM calls, the reusable container pool handles
~35 LLM calls efficiently without cold starts.

## Running the pipeline

To run the batch pipeline:

```python
if __name__ == "__main__":
    flyte.init_from_config()

    # Multiple topics to generate reports for
    topics = [
        "The Impact of Large Language Models on Software Development",
        "Edge Computing: Bringing AI to IoT Devices",
        "Quantum Computing: Current State and Near-Term Applications",
        "The Rise of Rust in Systems Programming",
        "WebAssembly: The Future of Browser-Based Applications",
    ]

    print(f"Submitting batch run for {len(topics)} topics...")
    import sys
    sys.stdout.flush()

    # Run the batch pipeline - this will generate all reports in parallel,
    # with the reusable container pool handling 5 topics × ~7 LLM calls each
    run = flyte.run(
        report_batch_pipeline,
        topics=topics,
        max_iterations=3,
        quality_threshold=8,
    )
    print(f"Batch report generation run URL: {run.url}")
    sys.stdout.flush()
    print("Waiting for pipeline to complete (Ctrl+C to skip)...")
    try:
        run.wait()
        print(f"Pipeline complete! Outputs: {run.outputs()}")
    except KeyboardInterrupt:
        print(f"\nSkipped waiting. Check status at: {run.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

```bash
uv run generate.py
```

The pipeline will:
1. Process all topics in parallel (each with iterative refinement)
2. Format all reports in parallel
3. Return a list of directories, each containing a report's outputs

## Cost optimization tips

### 1. Choose the right model

The example uses `gpt-4o-mini` for cost efficiency. For higher quality (at higher
cost), you could use `gpt-4o` or `gpt-4-turbo`:

```python
response = await client.chat.completions.create(
    model="gpt-4o",  # More capable, more expensive
    ...
)
```

### 2. Tune iteration parameters

Fewer iterations mean fewer API calls:

```python
run = flyte.run(
    report_batch_pipeline,
    topics=["Topic A", "Topic B"],
    max_iterations=2,      # Limit iterations
    quality_threshold=7,   # Accept slightly lower quality
)
```

### 3. Use caching effectively

The `cache="auto"` setting on the environment caches task outputs. Running the
same pipeline with the same inputs returns cached results instantly:

```python
llm_env = flyte.TaskEnvironment(
    ...
    cache="auto",  # Cache task outputs
)
```

### 4. Scale the batch

The batch pipeline already processes topics in parallel. To handle larger batches,
adjust the `ReusePolicy`:

```python
reusable=flyte.ReusePolicy(
    replicas=4,           # More containers for larger batches
    concurrency=4,        # Tasks per container
    ...
)
```

With 4 replicas × 4 concurrency = 16 slots, you can process 16 topics' refinement
tasks concurrently.

## Next steps

Learn how to [deploy a serving app](./serving-app) that connects to the pipeline
outputs and provides an interactive UI for report generation.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/advanced-project/serving-app ===

# Serving app

The final piece is a serving application that displays generated reports and
provides an interactive interface. This demonstrates how to connect apps to
pipeline outputs using `RunOutput`.

## App environment configuration

The `AppEnvironment` defines how the Streamlit application runs and connects to
the batch report pipeline:

```python
# Define the app environment
env = AppEnvironment(
    name="report-generator-app",
    description="Interactive report generator with AI-powered refinement",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit>=1.41.0",
    ),
    args=["streamlit", "run", "app.py", "--server.port", "8080"],
    port=8080,
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    parameters=[
        # Connect to the batch pipeline output (list of report directories)
        Parameter(
            name="reports",
            value=RunOutput(
                task_name="driver.report_batch_pipeline",
                type="directory",
            ),
            download=True,
            env_var="REPORTS_PATH",
        ),
    ],
    include=["app.py"],
    requires_auth=False,
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/serve.py*

### Key configuration

| Setting | Purpose |
|---------|---------|
| `args` | Command to run the Streamlit app |
| `port` | Port the app listens on |
| `parameters` | Inputs to the app, including pipeline connections |
| `include` | Additional files to bundle with the app |

### Connecting to pipeline output with RunOutput

The `RunOutput` parameter connects the app to the batch pipeline's output:

```python
Parameter(
    name="reports",
    value=RunOutput(
        task_name="driver.report_batch_pipeline",
        type="directory",
    ),
    download=True,
    env_var="REPORTS_PATH",
)
```

This configuration:
1. **Finds the latest run** of `report_batch_pipeline` in the `driver` environment
2. **Downloads the output** to local storage (`download=True`)
3. **Sets an environment variable** with the path (`REPORTS_PATH`)

The app can then scan this directory for all generated reports.

## The Streamlit application

The app loads and displays all generated reports from the batch pipeline:

```python
def load_report_from_dir(report_dir: str) -> dict | None:
    """Load a single report from a directory."""
    if not os.path.isdir(report_dir):
        return None

    report = {"path": report_dir, "name": os.path.basename(report_dir)}

    md_path = os.path.join(report_dir, "report.md")
    if os.path.exists(md_path):
        with open(md_path) as f:
            report["markdown"] = f.read()

    html_path = os.path.join(report_dir, "report.html")
    if os.path.exists(html_path):
        with open(html_path) as f:
            report["html"] = f.read()

    summary_path = os.path.join(report_dir, "summary.txt")
    if os.path.exists(summary_path):
        with open(summary_path) as f:
            report["summary"] = f.read()

    # Only return if we found at least markdown content
    return report if "markdown" in report else None

def load_all_reports() -> list[dict]:
    """Load all reports from the batch pipeline output."""
    reports_path = os.environ.get("REPORTS_PATH")
    if not reports_path or not os.path.exists(reports_path):
        return []

    reports = []

    # Check if this is a single report directory (has report.md directly)
    if os.path.exists(os.path.join(reports_path, "report.md")):
        report = load_report_from_dir(reports_path)
        if report:
            report["name"] = "Report"
            reports.append(report)
    else:
        # Batch output: scan subdirectories for reports
        for entry in sorted(os.listdir(reports_path)):
            entry_path = os.path.join(reports_path, entry)
            report = load_report_from_dir(entry_path)
            if report:
                reports.append(report)

    return reports
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/app.py*

### Displaying multiple reports

The app provides a sidebar for selecting between reports when multiple are available:

```python
reports = load_all_reports()

if reports:
    # Sidebar for report selection if multiple reports
    if len(reports) > 1:
        st.sidebar.header("Select Report")
        report_names = [f"Report {i+1}: {r['name']}" for i, r in enumerate(reports)]
        selected_idx = st.sidebar.selectbox(
            "Choose a report to view:",
            range(len(reports)),
            format_func=lambda i: report_names[i],
        )
        selected_report = reports[selected_idx]
        st.sidebar.markdown(f"**Viewing {len(reports)} reports**")
    else:
        selected_report = reports[0]

    st.header(f"Generated Report: {selected_report['name']}")

    # Summary section
    if "summary" in selected_report:
        with st.expander("Executive Summary", expanded=True):
            st.write(selected_report["summary"])

    # Tabbed view for different formats
    tab_md, tab_html = st.tabs(["Markdown", "HTML Preview"])

    with tab_md:
        st.markdown(selected_report.get("markdown", ""))

    with tab_html:
        if "html" in selected_report:
            st.components.v1.html(selected_report["html"], height=600, scrolling=True)

    # Download options
    st.subheader("Download")
    col1, col2, col3 = st.columns(3)

    with col1:
        if "markdown" in selected_report:
            st.download_button(
                label="Download Markdown",
                data=selected_report["markdown"],
                file_name="report.md",
                mime="text/markdown",
            )

    with col2:
        if "html" in selected_report:
            st.download_button(
                label="Download HTML",
                data=selected_report["html"],
                file_name="report.html",
                mime="text/html",
            )

    with col3:
        if "summary" in selected_report:
            st.download_button(
                label="Download Summary",
                data=selected_report["summary"],
                file_name="summary.txt",
                mime="text/plain",
            )

else:
    st.info("No reports generated yet. Run the batch pipeline to create reports.")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/app.py*

Features:
- **Report selector**: Sidebar navigation when multiple reports exist
- **Executive summary**: Expandable section with key takeaways
- **Tabbed views**: Switch between Markdown and HTML preview
- **Download buttons**: Export in any format

### Generation instructions

The app includes instructions for generating new reports:

```python
st.divider()
st.header("Generate New Reports")
st.write("""
To generate reports, run the batch pipeline:

```bash
uv run generate.py
```

This generates reports for multiple topics in parallel, demonstrating
how ReusePolicy efficiently handles many concurrent LLM calls.
""")

# Show pipeline parameters info
with st.expander("Pipeline Parameters"):
    st.markdown("""
    **Available parameters:**

    | Parameter | Default | Description |
    |-----------|---------|-------------|
    | `topics` | (required) | List of topics to write about |
    | `max_iterations` | 3 | Maximum refinement cycles per topic |
    | `quality_threshold` | 8 | Minimum score (1-10) to accept |

    **Example:**
    ```python
    run = flyte.run(
        report_batch_pipeline,
        topics=[
            "The Future of Renewable Energy",
            "Advances in Quantum Computing",
            "The Rise of Edge AI",
        ],
        max_iterations=3,
        quality_threshold=8,
    )
    ```
    """)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/app.py*

## Deploying the app

To deploy the report generator application:

```python
if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy the report generator app
    print("Deploying report generator app...")
    deployment = flyte.serve(env)
    print(f"App deployed at: {deployment.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/serve.py*

```bash
uv run serve.py
```

The deployment process:
1. Builds a container image with the app code
2. Deploys the app to Union.ai
3. Connects to the latest pipeline output
4. Returns the app URL

## Workflow: Generate then view

The typical workflow is:

1. **Run the batch pipeline** to generate reports:
   ```bash
   uv run generate.py
   ```

2. **Deploy or refresh the app** to view results:
   ```bash
   uv run serve.py
   ```

3. **Access the app** at the provided URL and browse all generated reports

The app automatically picks up the latest pipeline run, so you can generate
new batches and always see the most recent results.

## Automatic updates with RunOutput

The `RunOutput` connection is evaluated at app startup. Each time the app
restarts or redeploys, it fetches the latest batch pipeline output.

For real-time updates without redeployment, you could:
1. Poll for new runs using the Flyte API
2. Implement a webhook that triggers app refresh
3. Use a database to track run status

## Complete example structure

Here's the full project structure:

```
advanced-project/
├── generate.py    # Main pipeline with agentic refinement
├── prompts.py     # System prompts and Pydantic models
├── serve.py       # App deployment configuration
└── app.py         # Streamlit user interface
```

## Running the complete example

1. **Set up the secret**:
   ```bash
   flyte secret create openai-api-key
   ```

2. **Run the pipeline**:
   ```bash
   cd /path/to/unionai-examples/v2/user-guide/advanced-project
   uv run generate.py
   ```

3. **Deploy the app**:
   ```bash
   uv run serve.py
   ```

4. **Open the app URL** and view your generated report

## Summary

This example demonstrated:

| Feature | What it does |
|---------|--------------|
| `ReusePolicy` | Keeps containers warm for high-throughput batch processing |
| `@flyte.trace` | Checkpoints LLM calls for recovery and observability |
| `RetryStrategy` | Handles transient API failures gracefully |
| `flyte.group` | Organizes parallel batches and iterations in the UI |
| `asyncio.gather` | Fans out to process multiple topics concurrently |
| Pydantic models | Structured LLM outputs |
| `AppEnvironment` | Deploys interactive Streamlit apps |
| `RunOutput` | Connects apps to pipeline outputs |

These patterns form the foundation for building production-grade AI workflows
that are resilient, observable, and cost-efficient at scale.

