# Resilient generation

This section covers the foundational patterns for building resilient LLM-powered
tasks: reusable environments, traced function calls, and retry strategies.

## Two environments

This example uses two task environments with different characteristics:

1. **`llm_env`** (reusable): For tasks that make many LLM calls in a loop or
   process batches in parallel. Container reuse avoids cold starts.
2. **`driver_env`** (standard): For orchestration tasks that fan out work to
   other tasks but don't make LLM calls themselves.

### Reusable environment for LLM work

When processing a batch of topics, each topic goes through multiple LLM calls
(generate, critique, revise, repeat). With 5 topics × ~7 calls each, that's ~35
LLM calls. `ReusePolicy` keeps containers warm to handle this efficiently:

```python
# Reusable environment for tasks that make many LLM calls in a loop.
# The ReusePolicy keeps containers warm, reducing cold start latency for iterative work.
llm_env = flyte.TaskEnvironment(
    name="llm-worker",
    secrets=[] if MOCK_MODE else [flyte.Secret(key="openai-api-key", as_env_var="OPENAI_API_KEY")],
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "unionai-reuse>=0.1.10",
        "openai>=1.0.0",
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    reusable=flyte.ReusePolicy(
        replicas=2,              # Keep 2 container instances ready
        concurrency=4,           # Allow 4 concurrent tasks per container
        scaledown_ttl=timedelta(minutes=5),   # Wait 5 min before scaling down
        idle_ttl=timedelta(minutes=30),       # Shut down after 30 min idle
    ),
    cache="auto",
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### ReusePolicy parameters

| Parameter | Description |
|-----------|-------------|
| `replicas` | Number of container instances to keep ready (or `(min, max)` tuple) |
| `concurrency` | Maximum tasks per container at once |
| `scaledown_ttl` | Minimum wait before scaling down a replica |
| `idle_ttl` | Time after which idle containers shut down completely |

The configuration above keeps 2 containers ready, allows 4 concurrent tasks per
container, waits 5 minutes before scaling down, and shuts down after 30 minutes
of inactivity.

> **📝 Note**
>
> Both `scaledown_ttl` and `idle_ttl` must be at least 30 seconds.

### Standard environment for orchestration

The driver environment doesn't need container reuse—it just coordinates work.
The `depends_on` parameter declares that tasks in this environment call tasks
in `llm_env`, ensuring both environments are deployed together:

```python
# Standard environment for orchestration tasks that don't need container reuse.
# depends_on declares that this environment's tasks call tasks in llm_env.
driver_env = flyte.TaskEnvironment(
    name="driver",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    depends_on=[llm_env],
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

## Traced LLM calls

The `@flyte.trace` decorator provides automatic checkpointing at the function level.
When a traced function completes successfully, its result is cached. If the task
fails and restarts, previously completed traced calls return their cached results
instead of re-executing.

```python
@flyte.trace
async def call_llm(prompt: str, system: str, json_mode: bool = False) -> str:
    """
    Make an LLM call with automatic checkpointing.

    The @flyte.trace decorator provides:
    - Automatic caching of results for identical inputs
    - Recovery from failures without re-running successful calls
    - Full observability in the Flyte UI

    Args:
        prompt: The user prompt to send
        system: The system prompt defining the LLM's role
        json_mode: Whether to request JSON output

    Returns:
        The LLM's response text
    """
    # Use mock responses for testing without API keys
    if MOCK_MODE:
        import asyncio
        await asyncio.sleep(0.5)  # Simulate API latency

        if "critique" in prompt.lower() or "critic" in system.lower():
            # Return good score if draft has been revised (contains revision marker)
            if "[REVISED]" in prompt:
                return MOCK_CRITIQUE_GOOD
            return MOCK_CRITIQUE_NEEDS_WORK
        elif "summary" in system.lower():
            return MOCK_SUMMARY
        elif "revis" in system.lower():
            # Return revised version with marker
            return MOCK_REPORT.replace("## Introduction", "[REVISED]\n\n## Introduction")
        else:
            return MOCK_REPORT

    from openai import AsyncOpenAI

    client = AsyncOpenAI()

    kwargs = {
        "model": "gpt-4o-mini",
        "messages": [
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ],
        "max_tokens": 2000,
    }

    if json_mode:
        kwargs["response_format"] = {"type": "json_object"}

    response = await client.chat.completions.create(**kwargs)
    return response.choices[0].message.content
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

### Benefits of tracing

1. **Cost savings**: Failed tasks don't re-run expensive API calls that already succeeded
2. **Faster recovery**: Resuming from checkpoints skips completed work
3. **Observability**: Each traced call appears in the Flyte UI with timing data

### When to use @flyte.trace

Use `@flyte.trace` for:
- LLM API calls (OpenAI, Anthropic, etc.)
- External API requests
- Any expensive operation you don't want to repeat on retry

Don't use `@flyte.trace` for:
- Simple computations (overhead outweighs benefit)
- Operations with side effects that shouldn't be skipped

## Traced helper functions

The LLM-calling functions are decorated with `@flyte.trace` rather than being
separate tasks. This keeps the architecture simple while still providing
checkpointing:

```python
@flyte.trace
async def generate_initial_draft(topic: str) -> str:
    """
    Generate the initial report draft.

    The @flyte.trace decorator provides checkpointing - if the task fails
    after this completes, it won't re-run on retry.

    Args:
        topic: The topic to write about

    Returns:
        The initial draft in markdown format
    """
    print(f"Generating initial draft for topic: {topic}")

    prompt = f"Write a comprehensive report on the following topic:\n\n{topic}"
    draft = await call_llm(prompt, GENERATOR_SYSTEM_PROMPT)

    print(f"Generated initial draft ({len(draft)} characters)")
    return draft
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/generate.py*

These traced functions run inside the `refine_report` task. If the task fails
and retries, completed traced calls return cached results instead of re-executing.

## Retry strategies

The task that orchestrates the LLM calls uses `retries` to handle transient failures:

```python
@llm_env.task(retries=3)
async def refine_report(topic: str, ...) -> str:
    # Traced functions are called here
    draft = await generate_initial_draft(topic)
    ...
```

### Configuring retries

You can specify retries as a simple integer:

```python
@llm_env.task(retries=3)
async def my_task():
    ...
```

Or use `RetryStrategy` for more control:

```python
@llm_env.task(retries=flyte.RetryStrategy(count=3))
async def my_task():
    ...
```

### Combining tracing with retries

When you combine `@flyte.trace` with task-level retries, you get the best of both:

1. Task fails after completing some traced calls
2. Flyte retries the task
3. Previously completed traced calls return cached results
4. Only the failed operation (and subsequent ones) re-execute

This pattern is essential for multi-step LLM workflows where you don't want to
re-run the entire chain when a single call fails.

## Structured prompts

The example uses a separate `prompts.py` module for system prompts and Pydantic models:

```python
GENERATOR_SYSTEM_PROMPT = """You are an expert report writer. Generate a well-structured,
informative report on the given topic. The report should include:

1. An engaging introduction that sets context
2. Clear sections with descriptive headings
3. Specific facts, examples, or data points where relevant
4. A conclusion that summarizes key takeaways

Write in a professional but accessible tone. Use markdown formatting for structure.
Aim for approximately 500-800 words."""

CRITIC_SYSTEM_PROMPT = """You are a demanding but fair editor reviewing a report draft.
Evaluate the report on these criteria:

- Clarity: Is the writing clear and easy to follow?
- Structure: Is it well-organized with logical flow?
- Depth: Does it provide sufficient detail and insight?
- Accuracy: Are claims supported and reasonable?
- Engagement: Is it interesting to read?

Provide your response as JSON matching this schema:
{
    "score": <1-10 integer>,
    "strengths": ["strength 1", "strength 2", ...],
    "improvements": ["improvement 1", "improvement 2", ...],
    "summary": "brief overall assessment"
}

Be specific in your feedback. A score of 8+ means the report is ready for publication."""

REVISER_SYSTEM_PROMPT = """You are an expert editor revising a report based on feedback.
Your task is to improve the report by addressing the specific improvements requested
while preserving its strengths.

Guidelines:
- Address each improvement point specifically
- Maintain the original voice and style
- Keep the same overall structure unless restructuring is requested
- Preserve any content that was praised as a strength
- Ensure the revised version is cohesive and flows well

Return only the revised report in markdown format, no preamble or explanation."""

SUMMARY_SYSTEM_PROMPT = """Create a concise executive summary (2-3 paragraphs) of the
following report. Capture the key points and main takeaways. Write in a professional
tone suitable for busy executives who need the essential information quickly."""
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/prompts.py*

### Pydantic models for structured output

LLM responses can be unpredictable. Using Pydantic models with JSON mode ensures
you get structured, validated data:

```python
class Critique(BaseModel):
    """Structured critique response from the LLM."""

    score: int = Field(
        ge=1,
        le=10,
        description="Quality score from 1-10, where 10 is publication-ready",
    )
    strengths: list[str] = Field(
        description="List of strengths in the current draft",
    )
    improvements: list[str] = Field(
        description="Specific improvements needed",
    )
    summary: str = Field(
        description="Brief summary of the critique",
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/advanced-project/prompts.py*

The `Critique` model validates that:
- `score` is an integer between 1 and 10
- `strengths` and `improvements` are lists of strings
- All required fields are present

If the LLM returns malformed JSON, Pydantic raises a validation error, which
triggers a retry (if configured).

## Next steps

With resilient generation in place, you're ready to build the
[agentic refinement loop](https://www.union.ai/docs/v2/union/user-guide/advanced-project/resilient-generation/agentic-refinement).

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/advanced-project/resilient-generation.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/advanced-project/resilient-generation/
