# Build apps
> This bundle contains all pages in the Build apps section.
> Source: https://www.union.ai/docs/v2/union/user-guide/build-apps/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps ===

# Build apps

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This section covers how to build different types of apps with Flyte, including Streamlit dashboards, FastAPI REST APIs, vLLM and SGLang model servers, webhooks, and WebSocket applications.

> [!TIP]
> Go to [Introducing apps](https://www.union.ai/docs/v2/union/user-guide/core-concepts/introducing-apps/page.md) for an overview of apps and a quick example.

## App types

Flyte supports various types of apps:

- **UI dashboard apps**: Interactive web dashboards and data visualization tools like Streamlit and Gradio
- **Web API apps**: REST APIs, webhooks, and backend services like FastAPI and Flask
- **Model serving apps**: High-performance LLM serving with vLLM and SGLang

## Next steps

- **Build apps > Single-script apps**: The simplest way to build and deploy apps in a single Python script
- **Build apps > Multi-script apps**: Build FastAPI and Streamlit apps with multiple files
- **Build apps > App usage patterns**: Call apps from tasks, tasks from apps, and apps from apps
- **Build apps > Secret-based authentication**: Authenticate FastAPI apps using Flyte secrets
- **Build apps > Streamlit app**: Build interactive Streamlit dashboards
- **Build apps > FastAPI app**: Create REST APIs and backend services
- **Build apps > vLLM app**: Serve large language models with vLLM
- **Build apps > SGLang app**: Serve LLMs with SGLang for structured generation

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/single-script-apps ===

# Single-script apps

The simplest way to build and deploy an app with Flyte is to write everything in a single Python script. This approach is perfect for:

- **Quick prototypes**: Rapidly test ideas and concepts
- **Simple services**: Basic HTTP servers, APIs, or dashboards
- **Learning**: Understanding how Flyte apps work without complexity
- **Minimal examples**: Demonstrating core functionality

All the code for your app—the application logic, the app environment configuration, and the deployment code—lives in one file. This makes it easy to understand, share, and deploy.

## Plain Python HTTP server

The simplest possible app is a plain Python HTTP server using Python's built-in `http.server` module. This requires no external dependencies beyond the Flyte SDK.

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# ///

"""A plain Python HTTP server example - the simplest possible app."""

import flyte
import flyte.app
from pathlib import Path

# {{docs-fragment server-code}}
# Create a simple HTTP server handler
from http.server import HTTPServer, BaseHTTPRequestHandler

class SimpleHandler(BaseHTTPRequestHandler):
    """A simple HTTP server handler."""

    def do_GET(self):

        if self.path == "/":
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(b"<h1>Hello from Plain Python Server!</h1>")

        elif self.path == "/health":
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(b'{"status": "healthy"}')

        else:
            self.send_response(404)
            self.end_headers()
# {{/docs-fragment server-code}}

# {{docs-fragment app-env}}
file_name = Path(__file__).name
app_env = flyte.app.AppEnvironment(
    name="plain-python-server",
    image=flyte.Image.from_debian_base(python_version=(3, 12)),
    args=["python", file_name, "--server"],
    port=8080,
    resources=flyte.Resources(cpu="1", memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    import sys

    if "--server" in sys.argv:
        server = HTTPServer(("0.0.0.0", 8080), SimpleHandler)
        print("Server running on port 8080")
        server.serve_forever()
    else:
        flyte.init_from_config(root_dir=Path(__file__).parent)
        app = flyte.serve(app_env)
        print(f"App URL: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/plain_python_server.py*

**Key points**

- **No external dependencies**: Uses only Python's standard library
- **Simple handler**: Define request handlers as Python classes
- **Basic command**: Run the server with a simple Python command
- **Minimal resources**: Requires only basic CPU and memory

## Streamlit app

Streamlit makes it easy to build interactive web dashboards. Here's a complete single-script Streamlit app:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "streamlit",
# ]
# ///

"""A single-script Streamlit app example."""

import pathlib
import streamlit as st
import flyte
import flyte.app

# {{docs-fragment streamlit-app}}
def main():
    st.set_page_config(page_title="Simple Streamlit App", page_icon="🚀")

    st.title("Hello from Streamlit!")
    st.write("This is a simple single-script Streamlit app.")

    name = st.text_input("What's your name?", "World")
    st.write(f"Hello, {name}!")

    if st.button("Click me!"):
        st.balloons()
        st.success("Button clicked!")
# {{/docs-fragment streamlit-app}}

# {{docs-fragment app-env}}
file_name = pathlib.Path(__file__).name
app_env = flyte.app.AppEnvironment(
    name="streamlit-single-script",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit==1.41.1"
    ),
    args=["streamlit", "run", file_name, "--server.port", "8080", "--", "--server"],
    port=8080,
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    import sys

    if "--server" in sys.argv:
        main()
    else:
        flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
        app = flyte.serve(app_env)
        print(f"App URL: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit_single_script.py*

**Key points**

- **Interactive UI**: Streamlit provides widgets and visualizations out of the box
- **Single file**: All UI logic and deployment code in one script
- **Simple deployment**: Just specify the Streamlit command and port
- **Rich ecosystem**: Access to Streamlit's extensive component library

## FastAPI app

FastAPI is a modern, fast web framework for building APIs. Here's a minimal single-script FastAPI app:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A single-script FastAPI app example - the simplest FastAPI app."""

from fastapi import FastAPI
import pathlib
import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment fastapi-app}}
app = FastAPI(
    title="Simple FastAPI App",
    description="A minimal single-script FastAPI application",
    version="1.0.0",
)

@app.get("/")
async def root():
    return {"message": "Hello, World!"}

@app.get("/health")
async def health():
    return {"status": "healthy"}
# {{/docs-fragment fastapi-app}}

# {{docs-fragment app-env}}
app_env = FastAPIAppEnvironment(
    name="fastapi-single-script",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.serve(app_env)
    print(f"Deployed: {app_deployment.url}")
    print(f"API docs: {app_deployment.url}/docs")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi_single_script.py*

**Key points**

- **FastAPIAppEnvironment**: Automatically configures uvicorn and FastAPI
- **Type hints**: FastAPI uses Python type hints for automatic validation
- **Auto docs**: Interactive API documentation at `/docs` endpoint
- **Async support**: Built-in support for async/await patterns

## Running single-script apps

To run any of these examples:

1. **Save the script** to a file (e.g., `my_app.py`)
2. **Ensure you have a config file** (`./.flyte/config.yaml` or `./config.yaml`)
3. **Run the script**:

```bash
python my_app.py
```

Or using `uv`:

```bash
uv run my_app.py
```

The script will:
- Initialize Flyte from your config
- Deploy the app to your Union/Flyte instance
- Print the app URL

## When to use single-script apps

**Use single-script apps when:**
- Building prototypes or proof-of-concepts
- Creating simple services with minimal logic
- Learning how Flyte apps work
- Sharing complete, runnable examples
- Building demos or tutorials

**Consider multi-script apps when:**
- Your app grows beyond a few hundred lines
- You need to organize code into modules
- You want to reuse components across apps
- You're building production applications

See [**Multi-script apps**](./multi-script-apps) for examples of organizing apps across multiple files.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/multi-script-apps ===

# Multi-script apps

Real-world applications often span multiple files. This page shows how to build FastAPI and Streamlit apps with multiple Python files.

## FastAPI multi-script app

### Project structure

```
project/
├── app.py          # Main FastAPI app file
└── module.py       # Helper module
```

### Example: Multi-file FastAPI app

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Multi-file FastAPI app example."""

from fastapi import FastAPI
from module import function  # Import from another file
import pathlib

import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment app-definition}}
app = FastAPI(title="Multi-file FastAPI Demo")

app_env = FastAPIAppEnvironment(
    name="fastapi-multi-file",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    # FastAPIAppEnvironment automatically includes necessary files
    # But you can also specify explicitly:
    # include=["app.py", "module.py"],
)
# {{/docs-fragment app-definition}}

# {{docs-fragment endpoint}}
@app.get("/")
async def root():
    return function()  # Uses function from module.py
# {{/docs-fragment endpoint}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(app_env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/multi_file/app.py*

```
# {{docs-fragment helper-function}}
def function():
    """Helper function used by the FastAPI app."""
    return {"message": "Hello from module.py!"}
# {{/docs-fragment helper-function}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/multi_file/module.py*

### Automatic file discovery

`FastAPIAppEnvironment` automatically discovers and includes the necessary files by analyzing your imports. However, if you have files that aren't automatically detected (like configuration files or data files), you can explicitly include them:

```python
app_env = FastAPIAppEnvironment(
    name="fastapi-with-config",
    app=app,
    include=["app.py", "module.py", "config.yaml"],  # Explicit includes
    # ...
)
```

## Streamlit multi-script app

### Project structure

```
project/
├── main.py         # Main Streamlit app
├── utils.py        # Utility functions
└── components.py   # Reusable components
```

### Example: Multi-file Streamlit app

```
import os

import streamlit as st
from utils import generate_data

# {{docs-fragment streamlit-app}}
all_columns = ["Apples", "Orange", "Pineapple"]
with st.container(border=True):
    columns = st.multiselect("Columns", all_columns, default=all_columns)

all_data = st.cache_data(generate_data)(columns=all_columns, seed=101)

data = all_data[columns]

tab1, tab2 = st.tabs(["Chart", "Dataframe"])
tab1.line_chart(data, height=250)
tab2.dataframe(data, height=250, use_container_width=True)
st.write(f"Environment: {os.environ}")
# {{/docs-fragment streamlit-app}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/main.py*

```
import numpy as np
import pandas as pd

# {{docs-fragment utils-function}}
def generate_data(columns: list[str], seed: int = 42):
    rng = np.random.default_rng(seed)
    data = pd.DataFrame(rng.random(size=(20, len(columns))), columns=columns)
    return data
# {{/docs-fragment utils-function}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/utils.py*

### Deploying multi-file Streamlit app

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# ///

"""A custom Streamlit app with multiple files."""

import pathlib
import flyte
import flyte.app

# {{docs-fragment app-env}}
image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
    "streamlit==1.41.1",
    "pandas==2.2.3",
    "numpy==2.2.3",
)

app_env = flyte.app.AppEnvironment(
    name="streamlit-multi-file-app",
    image=image,
    args="streamlit run main.py --server.port 8080",
    port=8080,
    include=["main.py", "utils.py"],  # Include your app files
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app = flyte.deploy(app_env)
    print(f"Deployed app: {app[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/multi_file_streamlit.py*

## Complex multi-file example

Here's a more complex example with multiple modules:

### Project structure

```
project/
├── app.py
├── models/
│   ├── __init__.py
│   └── user.py
├── services/
│   ├── __init__.py
│   └── auth.py
└── utils/
    ├── __init__.py
    └── helpers.py
```

### Example code

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Complex multi-file FastAPI app example."""

from pathlib import Path
from fastapi import FastAPI
from models.user import User
from services.auth import authenticate
from utils.helpers import format_response

import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment complex-app}}
app = FastAPI(title="Complex Multi-file App")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = User(id=user_id, name="John Doe")
    return format_response(user)
# {{/docs-fragment complex-app}}

# {{docs-fragment complex-env}}
app_env = FastAPIAppEnvironment(
    name="complex-app",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "pydantic",
    ),
    # Include all necessary files
    include=[
        "app.py",
        "models/",
        "services/",
        "utils/",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
)
# {{/docs-fragment complex-env}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(app_env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/complex_multi_file/app.py*

```
# {{docs-fragment user-model}}
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
# {{/docs-fragment user-model}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/complex_multi_file/models/user.py*

```
# {{docs-fragment auth-service}}
def authenticate(token: str) -> bool:
    """Authenticate a user by token."""
    # ... authentication logic ...
    return True
# {{/docs-fragment auth-service}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/complex_multi_file/services/auth.py*

```
# {{docs-fragment helpers}}
def format_response(data):
    """Format a response with standard structure."""
    return {"data": data, "status": "success"}
# {{/docs-fragment helpers}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/complex_multi_file/utils/helpers.py*

### Deploying complex app

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Complex multi-file FastAPI app example."""

from pathlib import Path
from fastapi import FastAPI
from models.user import User
from services.auth import authenticate
from utils.helpers import format_response

import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment complex-app}}
app = FastAPI(title="Complex Multi-file App")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = User(id=user_id, name="John Doe")
    return format_response(user)
# {{/docs-fragment complex-app}}

# {{docs-fragment complex-env}}
app_env = FastAPIAppEnvironment(
    name="complex-app",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "pydantic",
    ),
    # Include all necessary files
    include=[
        "app.py",
        "models/",
        "services/",
        "utils/",
    ],
    resources=flyte.Resources(cpu=1, memory="512Mi"),
)
# {{/docs-fragment complex-env}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(app_env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/complex_multi_file/app.py*

## Best practices

1. **Use explicit includes**: For Streamlit apps, explicitly list all files in `include`
2. **Automatic discovery**: For FastAPI apps, `FastAPIAppEnvironment` handles most cases automatically
3. **Organize modules**: Use proper Python package structure with `__init__.py` files
4. **Test locally**: Test your multi-file app locally before deploying
5. **Include all dependencies**: Include all files that your app imports

## Troubleshooting

**Import errors:**
- Verify all files are included in the `include` parameter
- Check that file paths are correct (relative to app definition file)
- Ensure `__init__.py` files are included for packages

**Module not found:**
- Add missing files to the `include` list
- Check that import paths match the file structure
- Verify that the image includes all necessary packages

**File not found at runtime:**
- Ensure all referenced files are included
- Check mount paths for file/directory inputs
- Verify file paths are relative to the app root directory

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/app-usage-patterns ===

# App usage patterns

Apps and tasks can interact in various ways: calling each other via HTTP, webhooks, WebSockets, or direct browser usage. This page describes the different patterns and when to use them.

## Patterns overview

1. **Build apps > App usage patterns > Call app from task**: A task makes HTTP requests to an app
2. **Build apps > App usage patterns > Call task from app (webhooks / APIs)**: An app triggers task execution via the Flyte SDK
3. **Build apps > App usage patterns > Call app from app**: One app makes HTTP requests to another app
4. **Build apps > App usage patterns > WebSocket-based patterns**: Real-time, bidirectional communication
5. **Browser-based access**: Users access apps directly through the browser

## Call app from task

Tasks can call apps by making HTTP requests to the app's endpoint. This is useful when:
- You need to use a long-running service during task execution
- You want to call a model serving endpoint from a batch processing task
- You need to interact with an API from a workflow

### Example: Task calling an app

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "httpx",
# ]
# ///

"""Example of a task calling an app."""

import pathlib
import httpx
from fastapi import FastAPI
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(title="Add One", description="Adds one to the input", version="1.0.0")

image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx")

# {{docs-fragment app-definition}}
app_env = FastAPIAppEnvironment(
    name="add-one-app",
    app=app,
    description="Adds one to the input",
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment app-definition}}

# {{docs-fragment task-env}}
task_env = flyte.TaskEnvironment(
    name="add_one_task_env",
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    depends_on=[app_env],  # Ensure app is deployed before task runs
)
# {{/docs-fragment task-env}}

# {{docs-fragment app-endpoint}}
@app.get("/")
async def add_one(x: int) -> dict[str, int]:
    """Main endpoint for the add-one app."""
    return {"result": x + 1}
# {{/docs-fragment app-endpoint}}

# {{docs-fragment task}}
@task_env.task
async def add_one_task(x: int) -> int:
    print(f"Calling app at {app_env.endpoint}")
    async with httpx.AsyncClient() as client:
        response = await client.get(app_env.endpoint, params={"x": x})
        response.raise_for_status()
        return response.json()["result"]
# {{/docs-fragment task}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    deployments = flyte.deploy(task_env)
    print(f"Deployed task environment: {deployments}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/task_calling_app.py*

Key points:
- The task environment uses `depends_on=[app_env]` to ensure the app is deployed first
- Access the app endpoint via `app_env.endpoint`
- Use standard HTTP client libraries (like `httpx`) to make requests

## Call task from app (webhooks / APIs)

Apps can trigger task execution using the Flyte SDK. This is useful for:

- Webhooks that trigger workflows
- APIs that need to run batch jobs
- Services that need to execute tasks asynchronously

Webhooks are HTTP endpoints that trigger actions in response to external events. Flyte apps can serve as webhook endpoints that trigger task runs, workflows, or other operations.

### Example: Basic webhook app

Here's a simple webhook that triggers Flyte tasks:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook that triggers Flyte tasks."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment auth}}
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials
# {{/docs-fragment auth}}

# {{docs-fragment lifespan}}
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield
    # Cleanup if needed
# {{/docs-fragment lifespan}}

# {{docs-fragment app}}
app = FastAPI(
    title="Flyte Webhook Runner",
    description="A webhook service that triggers Flyte task runs",
    version="1.0.0",
    lifespan=lifespan,
)

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}
# {{/docs-fragment app}}

# {{docs-fragment webhook-endpoint}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: dict,
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    """
    Trigger a Flyte task run via webhook.

    Returns information about the launched run.
    """
    # Fetch the task
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    # Run the task
    run = await flyte.run.aio(task, **inputs)

    return {
        "url": run.url,
        "id": run.id,
        "status": "started",
    }
# {{/docs-fragment webhook-endpoint}}

# {{docs-fragment env}}
env = FastAPIAppEnvironment(
    name="webhook-runner",
    app=app,
    description="A webhook service that triggers Flyte task runs",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,  # We handle auth in the app
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)
# {{/docs-fragment env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/webhook/basic_webhook.py*

Once deployed, you can trigger tasks via HTTP POST:

```bash
curl -X POST "https://your-webhook-url/run-task/flytesnacks/development/my_task/v1" \
  -H "Authorization: Bearer test-api-key" \
  -H "Content-Type: application/json" \
  -d '{"input_key": "input_value"}'
```

Response:

```json
{
  "url": "https://console.union.ai/...",
  "id": "abc123",
  "status": "started"
}
```

### Advanced webhook patterns

**Webhook with validation**

Use Pydantic for input validation:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook with Pydantic validation."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
from pydantic import BaseModel
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner with Validation",
    description="A webhook service that triggers Flyte task runs with Pydantic validation",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment validation-model}}
class TaskInput(BaseModel):
    data: dict
    priority: int = 0
# {{/docs-fragment validation-model}}

# {{docs-fragment validated-webhook}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: TaskInput,  # Validated input
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs.model_dump())

    return {
        "run_id": run.id,
        "url": run.url,
    }
# {{/docs-fragment validated-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-with-validation",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_validation.py*

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook with Pydantic validation."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
from pydantic import BaseModel
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner with Validation",
    description="A webhook service that triggers Flyte task runs with Pydantic validation",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment validation-model}}
class TaskInput(BaseModel):
    data: dict
    priority: int = 0
# {{/docs-fragment validation-model}}

# {{docs-fragment validated-webhook}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: TaskInput,  # Validated input
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs.model_dump())

    return {
        "run_id": run.id,
        "url": run.url,
    }
# {{/docs-fragment validated-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-with-validation",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_validation.py*

**Webhook with response waiting**

Wait for task completion:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook that waits for task completion."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner (Wait for Completion)",
    description="A webhook service that triggers Flyte task runs and waits for completion",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment wait-webhook}}
@app.post("/run-task-and-wait/{project}/{domain}/{name}/{version}")
async def run_task_and_wait(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: dict,
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs)
    run.wait()  # Wait for completion

    return {
        "run_id": run.id,
        "url": run.url,
        "status": run.status,
        "outputs": run.outputs(),
    }
# {{/docs-fragment wait-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-wait-completion",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_wait.py*

**Webhook with secret management**

Use Flyte secrets for API keys:

```python
env = FastAPIAppEnvironment(
    name="webhook-runner",
    app=app,
    secrets=flyte.Secret(key="webhook-api-key", as_env_var="WEBHOOK_API_KEY"),
    # ...
)
```

Then access in your app:

```python
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY")
```

### Webhook security and best practices

- **Authentication**: Always secure webhooks with authentication (API keys, tokens, etc.).
- **Input validation**: Validate webhook inputs using Pydantic models.
- **Error handling**: Handle errors gracefully and return meaningful error messages.
- **Async operations**: Use async/await for I/O operations.
- **Health checks**: Include health check endpoints.
- **Logging**: Log webhook requests for debugging and auditing.
- **Rate limiting**: Consider implementing rate limiting for production.

Security considerations:

- Store API keys in Flyte secrets, not in code.
- Always use HTTPS in production.
- Validate all inputs to prevent injection attacks.
- Implement proper access control mechanisms.
- Log all webhook invocations for security auditing.

### Example: GitHub webhook

Here's an example webhook that triggers tasks based on GitHub events:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A GitHub webhook that triggers Flyte tasks based on GitHub events."""

import pathlib
import hmac
import hashlib
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Header, HTTPException
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="GitHub Webhook Handler",
    description="Triggers Flyte tasks based on GitHub events",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment github-webhook}}
@app.post("/github-webhook")
async def github_webhook(
    request: Request,
    x_hub_signature_256: str = Header(None),
):
    """Handle GitHub webhook events."""
    body = await request.body()

    # Verify signature
    secret = os.getenv("GITHUB_WEBHOOK_SECRET")
    signature = hmac.new(
        secret.encode(),
        body,
        hashlib.sha256
    ).hexdigest()

    expected_signature = f"sha256={signature}"
    if not hmac.compare_digest(x_hub_signature_256, expected_signature):
        raise HTTPException(status_code=403, detail="Invalid signature")

    # Process webhook
    event = await request.json()
    event_type = request.headers.get("X-GitHub-Event")

    if event_type == "push":
        # Trigger deployment task
        task = remote.Task.get(
            project="my-project",
            domain="development",
            name="deploy-task",
            version="v1",
        )
        run = await flyte.run.aio(task, commit=event["after"])
        return {"run_id": run.id, "url": run.url}

    return {"status": "ignored"}
# {{/docs-fragment github-webhook}}

# {{docs-fragment env}}
env = FastAPIAppEnvironment(
    name="github-webhook",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    secrets=flyte.Secret(key="GITHUB_WEBHOOK_SECRET", as_env_var="GITHUB_WEBHOOK_SECRET"),
)
# {{/docs-fragment env}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed GitHub webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/github_webhook.py*

### Gradio agent UI

For AI agents, a Gradio app lets you build an interactive UI that kicks off agent runs. The app uses `flyte.with_runcontext()` to run the agent task either locally or on a remote cluster, controlled by an environment variable.

```python
import os
import flyte
import flyte.app
from research_agent import agent

RUN_MODE = os.getenv("RUN_MODE", "remote")

serving_env = flyte.app.AppEnvironment(
    name="research-agent-ui",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "gradio", "langchain-core", "langchain-openai", "langgraph",
    ),
    secrets=flyte.Secret(key="OPENAI_API_KEY", as_env_var="OPENAI_API_KEY"),
    port=7860,
)

def run_query(request: str):
    """Kick off the agent as a Flyte task."""
    result = flyte.with_runcontext(mode=RUN_MODE).run(agent, request=request)
    result.wait()
    return result.outputs()[0]

@serving_env.server
def app_server():
    create_demo().launch(server_name="0.0.0.0", server_port=7860)

if __name__ == "__main__":
    create_demo().launch()
```

The `RUN_MODE` variable gives you a smooth development progression:

1. **Fully local**: `RUN_MODE=local python agent_app.py`. Everything runs in your local Python environment, great for rapid iteration.
2. **Local app, remote task**: `python agent_app.py`. The UI runs locally but the agent executes on the cluster with full compute resources.
3. **Full remote**: `flyte deploy agent_app.py serving_env`. Both the UI and agent run on the cluster.

## Call app from app

Apps can call other apps by making HTTP requests. This is useful for:
- Microservice architectures
- Proxy/gateway patterns
- A/B testing setups
- Service composition

### Example: App calling another app

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "httpx",
# ]
# ///

"""Example of one app calling another app."""

import httpx
from fastapi import FastAPI
import pathlib
import flyte
from flyte.app.extras import FastAPIAppEnvironment

image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
    "fastapi", "uvicorn", "httpx"
)

# {{docs-fragment backend-app}}
app1 = FastAPI(
    title="App 1",
    description="A FastAPI app that runs some computations",
)

env1 = FastAPIAppEnvironment(
    name="app1-is-called-by-app2",
    app=app1,
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment backend-app}}

# {{docs-fragment frontend-app}}
app2 = FastAPI(
    title="App 2",
    description="A FastAPI app that proxies requests to another FastAPI app",
)

env2 = FastAPIAppEnvironment(
    name="app2-calls-app1",
    app=app2,
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    depends_on=[env1],  # Depends on backend-api
)
# {{/docs-fragment frontend-app}}

# {{docs-fragment backend-endpoint}}
@app1.get("/greeting/{name}")
async def greeting(name: str) -> str:
    return f"Hello, {name}!"
# {{/docs-fragment backend-endpoint}}

# {{docs-fragment frontend-endpoints}}
@app2.get("/app1-endpoint")
async def get_app1_endpoint() -> str:
    return env1.endpoint  # Access the backend endpoint

@app2.get("/greeting/{name}")
async def greeting_proxy(name: str):
    """Proxy that calls the backend app."""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{env1.endpoint}/greeting/{name}")
        response.raise_for_status()
        return response.json()
# {{/docs-fragment frontend-endpoints}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    deployments = flyte.deploy(env2)
    print(f"Deployed FastAPI app: {deployments[0].env_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/app_calling_app.py*

Key points:
- Use `depends_on=[env1]` to ensure dependencies are deployed first
- Access the app endpoint via `env1.endpoint`
- Use HTTP clients (like `httpx`) to make requests between apps

### Using AppEndpoint parameter

You can pass app endpoints as parameters for more flexibility:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "httpx",
# ]
# ///

"""Example of one app calling another app."""

import os
import httpx
from fastapi import FastAPI
import pathlib
import flyte
from flyte.app.extras import FastAPIAppEnvironment

image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
    "fastapi", "uvicorn", "httpx"
)

# {{docs-fragment backend-app}}
app1 = FastAPI(
    title="App 1",
    description="A FastAPI app that runs some computations",
)

env1 = FastAPIAppEnvironment(
    name="app1-is-called-by-app2",
    app=app1,
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)

@app1.get("/greeting/{name}")
async def greeting(name: str) -> str:
    return f"Hello, {name}!"
# {{/docs-fragment backend-app}}

# {{docs-fragment using-app-endpoint}}
app2 = FastAPI(
    title="App 2",
    description="A FastAPI app that proxies requests to another FastAPI app",
)

env2 = FastAPIAppEnvironment(
    name="app2-calls-app1",
    app=app2,
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    depends_on=[env1],  # Depends on backend-api
    parameters=[
        flyte.app.Parameter(
            name="app1_endpoint",
            value=flyte.app.AppEndpoint(app_name="app1-is-called-by-app2"),
            env_var="APP1_ENDPOINT",
        ),
    ],
)

@app2.get("/greeting/{name}")
async def greeting_proxy(name: str):
    app1_endpoint = os.getenv("APP1_ENDPOINT")
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{app1_endpoint}/greeting/{name}")
        response.raise_for_status()
        return response.json()
# {{/docs-fragment using-app-endpoint}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    deployments = flyte.deploy(env2)
    print(f"Deployed FastAPI app: {deployments[0].env_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/app_calling_app_endpoint.py*

## WebSocket-based patterns

WebSockets enable bidirectional, real-time communication between clients and servers. Flyte apps can serve WebSocket endpoints for real-time applications like chat, live updates, or streaming data.

### Example: Basic WebSocket app

Here's a simple FastAPI app with WebSocket support:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""A FastAPI app with WebSocket support."""

import pathlib
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import json
from datetime import UTC, datetime
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(
    title="Flyte WebSocket Demo",
    description="A FastAPI app with WebSocket support",
    version="1.0.0",
)

# {{docs-fragment connection-manager}}
class ConnectionManager:
    """Manages WebSocket connections."""

    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """Accept and register a new WebSocket connection."""
        await websocket.accept()
        self.active_connections.append(websocket)
        print(f"Client connected. Total: {len(self.active_connections)}")

    def disconnect(self, websocket: WebSocket):
        """Remove a WebSocket connection."""
        self.active_connections.remove(websocket)
        print(f"Client disconnected. Total: {len(self.active_connections)}")

    async def send_personal_message(self, message: str, websocket: WebSocket):
        """Send a message to a specific WebSocket connection."""
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        """Broadcast a message to all active connections."""
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception as e:
                print(f"Error broadcasting: {e}")

manager = ConnectionManager()
# {{/docs-fragment connection-manager}}

# {{docs-fragment websocket-endpoint}}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint for real-time communication."""
    await manager.connect(websocket)

    try:
        # Send welcome message
        await manager.send_personal_message(
            json.dumps({
                "type": "system",
                "message": "Welcome! You are connected.",
                "timestamp": datetime.now(UTC).isoformat(),
            }),
            websocket,
        )

        # Listen for messages
        while True:
            data = await websocket.receive_text()

            # Echo back to sender
            await manager.send_personal_message(
                json.dumps({
                    "type": "echo",
                    "message": f"Echo: {data}",
                    "timestamp": datetime.now(UTC).isoformat(),
                }),
                websocket,
            )

            # Broadcast to all clients
            await manager.broadcast(
                json.dumps({
                    "type": "broadcast",
                    "message": f"Broadcast: {data}",
                    "timestamp": datetime.now(UTC).isoformat(),
                    "connections": len(manager.active_connections),
                })
            )

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(
            json.dumps({
                "type": "system",
                "message": "A client disconnected",
                "connections": len(manager.active_connections),
            })
        )
# {{/docs-fragment websocket-endpoint}}

# {{docs-fragment env}}
env = FastAPIAppEnvironment(
    name="websocket-app",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)
# {{/docs-fragment env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed websocket app: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/basic_websocket.py*

### WebSocket patterns

**Echo server**

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""WebSocket patterns: echo, broadcast, streaming, and chat."""

import asyncio
import json
import random
from datetime import datetime, UTC
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(
    title="WebSocket Patterns Demo",
    description="Demonstrates various WebSocket patterns",
    version="1.0.0",
)

# {{docs-fragment echo-server}}
@app.websocket("/echo")
async def echo(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass
# {{/docs-fragment echo-server}}

# Connection manager for broadcast
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

manager = ConnectionManager()

# {{docs-fragment broadcast-server}}
@app.websocket("/broadcast")
async def broadcast(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(data)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
# {{/docs-fragment broadcast-server}}

# {{docs-fragment streaming-server}}
@app.websocket("/stream")
async def stream_data(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Generate or fetch data
            data = {"timestamp": datetime.now(UTC).isoformat(), "value": random.random()}
            await websocket.send_json(data)
            await asyncio.sleep(1)  # Send update every second
    except WebSocketDisconnect:
        pass
# {{/docs-fragment streaming-server}}

# {{docs-fragment chat-room}}
class ChatRoom:
    def __init__(self, name: str):
        self.name = name
        self.connections: list[WebSocket] = []

    async def join(self, websocket: WebSocket):
        self.connections.append(websocket)

    async def leave(self, websocket: WebSocket):
        self.connections.remove(websocket)

    async def broadcast(self, message: str, sender: WebSocket):
        for connection in self.connections:
            if connection != sender:
                await connection.send_text(message)

rooms: dict[str, ChatRoom] = {}

@app.websocket("/chat/{room_name}")
async def chat(websocket: WebSocket, room_name: str):
    await websocket.accept()

    if room_name not in rooms:
        rooms[room_name] = ChatRoom(room_name)

    room = rooms[room_name]
    await room.join(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            await room.broadcast(data, websocket)
    except WebSocketDisconnect:
        await room.leave(websocket)
# {{/docs-fragment chat-room}}

env = FastAPIAppEnvironment(
    name="websocket-patterns",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed WebSocket patterns app: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/websocket_patterns.py*

**Broadcast server**

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""WebSocket patterns: echo, broadcast, streaming, and chat."""

import asyncio
import json
import random
from datetime import datetime, UTC
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(
    title="WebSocket Patterns Demo",
    description="Demonstrates various WebSocket patterns",
    version="1.0.0",
)

# {{docs-fragment echo-server}}
@app.websocket("/echo")
async def echo(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass
# {{/docs-fragment echo-server}}

# Connection manager for broadcast
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

manager = ConnectionManager()

# {{docs-fragment broadcast-server}}
@app.websocket("/broadcast")
async def broadcast(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(data)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
# {{/docs-fragment broadcast-server}}

# {{docs-fragment streaming-server}}
@app.websocket("/stream")
async def stream_data(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Generate or fetch data
            data = {"timestamp": datetime.now(UTC).isoformat(), "value": random.random()}
            await websocket.send_json(data)
            await asyncio.sleep(1)  # Send update every second
    except WebSocketDisconnect:
        pass
# {{/docs-fragment streaming-server}}

# {{docs-fragment chat-room}}
class ChatRoom:
    def __init__(self, name: str):
        self.name = name
        self.connections: list[WebSocket] = []

    async def join(self, websocket: WebSocket):
        self.connections.append(websocket)

    async def leave(self, websocket: WebSocket):
        self.connections.remove(websocket)

    async def broadcast(self, message: str, sender: WebSocket):
        for connection in self.connections:
            if connection != sender:
                await connection.send_text(message)

rooms: dict[str, ChatRoom] = {}

@app.websocket("/chat/{room_name}")
async def chat(websocket: WebSocket, room_name: str):
    await websocket.accept()

    if room_name not in rooms:
        rooms[room_name] = ChatRoom(room_name)

    room = rooms[room_name]
    await room.join(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            await room.broadcast(data, websocket)
    except WebSocketDisconnect:
        await room.leave(websocket)
# {{/docs-fragment chat-room}}

env = FastAPIAppEnvironment(
    name="websocket-patterns",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed WebSocket patterns app: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/websocket_patterns.py*

**Real-time data streaming**

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""WebSocket patterns: echo, broadcast, streaming, and chat."""

import asyncio
import json
import random
from datetime import datetime, UTC
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(
    title="WebSocket Patterns Demo",
    description="Demonstrates various WebSocket patterns",
    version="1.0.0",
)

# {{docs-fragment echo-server}}
@app.websocket("/echo")
async def echo(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass
# {{/docs-fragment echo-server}}

# Connection manager for broadcast
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

manager = ConnectionManager()

# {{docs-fragment broadcast-server}}
@app.websocket("/broadcast")
async def broadcast(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(data)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
# {{/docs-fragment broadcast-server}}

# {{docs-fragment streaming-server}}
@app.websocket("/stream")
async def stream_data(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Generate or fetch data
            data = {"timestamp": datetime.now(UTC).isoformat(), "value": random.random()}
            await websocket.send_json(data)
            await asyncio.sleep(1)  # Send update every second
    except WebSocketDisconnect:
        pass
# {{/docs-fragment streaming-server}}

# {{docs-fragment chat-room}}
class ChatRoom:
    def __init__(self, name: str):
        self.name = name
        self.connections: list[WebSocket] = []

    async def join(self, websocket: WebSocket):
        self.connections.append(websocket)

    async def leave(self, websocket: WebSocket):
        self.connections.remove(websocket)

    async def broadcast(self, message: str, sender: WebSocket):
        for connection in self.connections:
            if connection != sender:
                await connection.send_text(message)

rooms: dict[str, ChatRoom] = {}

@app.websocket("/chat/{room_name}")
async def chat(websocket: WebSocket, room_name: str):
    await websocket.accept()

    if room_name not in rooms:
        rooms[room_name] = ChatRoom(room_name)

    room = rooms[room_name]
    await room.join(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            await room.broadcast(data, websocket)
    except WebSocketDisconnect:
        await room.leave(websocket)
# {{/docs-fragment chat-room}}

env = FastAPIAppEnvironment(
    name="websocket-patterns",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed WebSocket patterns app: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/websocket_patterns.py*

**Chat application**

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""WebSocket patterns: echo, broadcast, streaming, and chat."""

import asyncio
import json
import random
from datetime import datetime, UTC
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(
    title="WebSocket Patterns Demo",
    description="Demonstrates various WebSocket patterns",
    version="1.0.0",
)

# {{docs-fragment echo-server}}
@app.websocket("/echo")
async def echo(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass
# {{/docs-fragment echo-server}}

# Connection manager for broadcast
class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                pass

manager = ConnectionManager()

# {{docs-fragment broadcast-server}}
@app.websocket("/broadcast")
async def broadcast(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(data)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
# {{/docs-fragment broadcast-server}}

# {{docs-fragment streaming-server}}
@app.websocket("/stream")
async def stream_data(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # Generate or fetch data
            data = {"timestamp": datetime.now(UTC).isoformat(), "value": random.random()}
            await websocket.send_json(data)
            await asyncio.sleep(1)  # Send update every second
    except WebSocketDisconnect:
        pass
# {{/docs-fragment streaming-server}}

# {{docs-fragment chat-room}}
class ChatRoom:
    def __init__(self, name: str):
        self.name = name
        self.connections: list[WebSocket] = []

    async def join(self, websocket: WebSocket):
        self.connections.append(websocket)

    async def leave(self, websocket: WebSocket):
        self.connections.remove(websocket)

    async def broadcast(self, message: str, sender: WebSocket):
        for connection in self.connections:
            if connection != sender:
                await connection.send_text(message)

rooms: dict[str, ChatRoom] = {}

@app.websocket("/chat/{room_name}")
async def chat(websocket: WebSocket, room_name: str):
    await websocket.accept()

    if room_name not in rooms:
        rooms[room_name] = ChatRoom(room_name)

    room = rooms[room_name]
    await room.join(websocket)

    try:
        while True:
            data = await websocket.receive_text()
            await room.broadcast(data, websocket)
    except WebSocketDisconnect:
        await room.leave(websocket)
# {{/docs-fragment chat-room}}

env = FastAPIAppEnvironment(
    name="websocket-patterns",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed WebSocket patterns app: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/websocket_patterns.py*

### Using WebSockets with Flyte tasks

You can trigger Flyte tasks from WebSocket messages:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "websockets",
# ]
# ///

"""A WebSocket app that triggers Flyte tasks and streams updates."""

import json
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="WebSocket Task Runner",
    description="Triggers Flyte tasks via WebSocket and streams updates",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment task-runner-websocket}}
@app.websocket("/task-runner")
async def task_runner(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            # Receive task request
            message = await websocket.receive_text()
            request = json.loads(message)

            # Trigger Flyte task
            task = remote.Task.get(
                project=request["project"],
                domain=request["domain"],
                name=request["task"],
                version=request["version"],
            )

            run = await flyte.run.aio(task, **request["inputs"])

            # Send run info back
            await websocket.send_json({
                "run_id": run.id,
                "url": run.url,
                "status": "started",
            })

            # Optionally stream updates
            async for update in run.stream():
                await websocket.send_json({
                    "status": update.status,
                    "message": update.message,
                })

    except WebSocketDisconnect:
        pass
# {{/docs-fragment task-runner-websocket}}

env = FastAPIAppEnvironment(
    name="task-runner-websocket",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "websockets",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed WebSocket task runner: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/websocket/task_runner_websocket.py*

### WebSocket client example

Connect from Python:

```python
import asyncio
import websockets
import json

async def client():
    uri = "ws://your-app-url/ws"
    async with websockets.connect(uri) as websocket:
        # Send message
        await websocket.send("Hello, Server!")
        
        # Receive message
        response = await websocket.recv()
        print(f"Received: {response}")

asyncio.run(client())
```

## Browser-based apps

For browser-based apps (like Streamlit), users interact directly through the web interface. The app URL is accessible in a browser, and users interact with the UI directly - no API calls needed from other services.

To access a browser-based app:
1. Deploy the app
2. Navigate to the app URL in a browser
3. Interact with the UI directly

## Best practices

1. **Use `depends_on`**: Always specify dependencies to ensure proper deployment order.
2. **Handle errors**: Implement proper error handling for HTTP requests.
3. **Use async clients**: Use async HTTP clients (`httpx.AsyncClient`) in async contexts.
4. **Initialize Flyte**: For apps calling tasks, initialize Flyte in the app's startup.
5. **Endpoint access**: Use `app_env.endpoint` or `AppEndpoint` parameter for accessing app URLs.
6. **Authentication**: Consider authentication when apps call each other (set `requires_auth=True` if needed).
7. **Webhook security**: Secure webhooks with auth, validation, and HTTPS.
8. **WebSocket robustness**: Implement connection management, heartbeats, and rate limiting.

## Summary

| Pattern | Use Case | Implementation |
|---------|----------|----------------|
| Task → App | Batch processing using inference services | HTTP requests from task |
| App → Task | Webhooks, APIs triggering workflows | Flyte SDK in app |
| App → App | Microservices, proxies, agent routers, LLM routers | HTTP requests between apps |
| Browser → App | User-facing dashboards | Direct browser access |

Choose the pattern that best fits your architecture and requirements.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/secret-based-authentication ===

# Secret-based authentication

In this guide, we'll deploy a FastAPI app that uses API key authentication with Flyte secrets. This allows you to invoke the endpoint from the public internet securely without exposing API keys in your code.

## Create the secret

Before defining and deploying the app, you need to create the `API_KEY` secret in Flyte. This secret will store your API key securely.

Create the secret using the Flyte CLI:

```bash
flyte create secret API_KEY <your-api-key-value>
```

For example:

```bash
flyte create secret API_KEY my-secret-api-key-12345
```

> [!NOTE]
> The secret name `API_KEY` must match the key specified in the `flyte.Secret()` call in your code. The secret will be available to your app as the environment variable specified in `as_env_var`.

## Define the FastAPI app

Here's a simple FastAPI app that uses `HTTPAuthorizationCredentials` to authenticate requests using a secret stored in Flyte:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Basic FastAPI authentication using dependency injection."""

from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
import pathlib
import flyte
from flyte.app.extras import FastAPIAppEnvironment

# Get API key from environment variable (loaded from Flyte secret)
# The secret must be created using: flyte create secret API_KEY <your-api-key-value>
API_KEY = os.getenv("API_KEY")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if not API_KEY:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="API_KEY not configured",
        )
    if credentials.credentials != API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

app = FastAPI(title="Authenticated API")

@app.get("/public")
async def public_endpoint():
    """Public endpoint that doesn't require authentication."""
    return {"message": "This is public"}

@app.get("/protected")
async def protected_endpoint(
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    """Protected endpoint that requires authentication."""
    return {
        "message": "This is protected",
        "user": credentials.credentials,
    }

env = FastAPIAppEnvironment(
    name="authenticated-api",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,  # We handle auth in the app
    secrets=flyte.Secret(key="API_KEY", as_env_var="API_KEY"),
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/basic_auth.py*

As you can see, we:

1. Define a `FastAPI` app
2. Create a `verify_token` function that verifies the API key from the Bearer token
3. Define endpoints that use the `verify_token` function to authenticate requests
4. Configure the `FastAPIAppEnvironment` with:
   - `requires_auth=False` - This allows the endpoint to be reached without going through Flyte's authentication, since we're handling authentication ourselves using the `API_KEY` secret
   - `secrets=flyte.Secret(key="API_KEY", as_env_var="API_KEY")` - This injects the secret value into the `API_KEY` environment variable at runtime

The key difference from using `env_vars` is that secrets are stored securely in Flyte's secret store and injected at runtime, rather than being passed as plain environment variables.

## Deploy the FastAPI app

Once the secret is created, you can deploy the FastAPI app. Make sure your `config.yaml` file is in the same directory as your script, then run:

```bash
python basic_auth.py
```

Or use the Flyte CLI:

```bash
flyte serve basic_auth.py
```

Deploying the application will stream the status to the console and display the app URL:

```
✨ Deploying Application: authenticated-api
🔎 Console URL: https://<union-tenant>/console/projects/my-project/domains/development/apps/fastapi-with-auth
[Status] Pending: App is pending deployment
[Status] Started: Service is ready
🚀 Deployed Endpoint: https://rough-meadow-97cf5.apps.<union-tenant>
```

## Invoke the endpoint

Once deployed, you can invoke the authenticated endpoint using curl:

```bash
curl -X GET "https://rough-meadow-97cf5.apps.<union-tenant>/protected" \
  -H "Authorization: Bearer <your-api-key-value>"
```

Replace `<your-api-key-value>` with the actual API key value you used when creating the secret.

For example, if you created the secret with value `my-secret-api-key-12345`:

```bash
curl -X GET "https://rough-meadow-97cf5.apps.<union-tenant>/protected" \
  -H "Authorization: Bearer my-secret-api-key-12345"
```

You should receive a response:

```json
{
  "message": "This is protected",
  "user": "my-secret-api-key-12345"
}
```

## Authentication for vLLM and SGLang apps

Both vLLM and SGLang apps support API key authentication through their native `--api-key` argument. This allows you to secure your LLM endpoints while keeping them accessible from the public internet.

### Create the authentication secret

Create a secret to store your API key:

```bash
flyte create secret AUTH_SECRET <your-api-key-value>
```

For example:

```bash
flyte create secret AUTH_SECRET my-llm-api-key-12345
```

### Deploy vLLM app with authentication

Here's how to deploy a vLLM app with API key authentication:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-vllm>=2.0.0b45",
# ]
# ///

"""vLLM app with API key authentication."""

import pathlib
from flyteplugins.vllm import VLLMAppEnvironment
import flyte

# The secret must be created using: flyte create secret AUTH_SECRET <your-api-key-value>
vllm_app = VLLMAppEnvironment(
    name="vllm-app-with-auth",
    model_hf_path="Qwen/Qwen3-0.6B",  # HuggingFace model path
    model_id="qwen3-0.6b",  # Model ID exposed by vLLM
    resources=flyte.Resources(
        cpu="4",
        memory="16Gi",
        gpu="L40s:1",  # GPU required for LLM serving
        disk="10Gi",
    ),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),
        scaledown_after=300,  # Scale down after 5 minutes of inactivity
    ),
    # Disable Union's platform-level authentication so you can access the
    # endpoint from the public internet
    requires_auth=False,
    # Inject the secret as an environment variable
    secrets=flyte.Secret(key="AUTH_SECRET", as_env_var="AUTH_SECRET"),
    # Pass the API key to vLLM's --api-key argument
    # The $AUTH_SECRET will be replaced with the actual secret value at runtime
    extra_args=[
        "--api-key", "$AUTH_SECRET",
    ],
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app = flyte.serve(vllm_app)
    print(f"Deployed vLLM app: {app.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/vllm/vllm_with_auth.py*

Key points:

1. **`requires_auth=False`** - Disables Union's platform-level authentication so the endpoint can be accessed from the public internet
2. **`secrets=flyte.Secret(key="AUTH_SECRET", as_env_var="AUTH_SECRET")`** - Injects the secret as an environment variable
3. **`extra_args=["--api-key", "$AUTH_SECRET"]`** - Passes the API key to vLLM's `--api-key` argument. The `$AUTH_SECRET` will be replaced with the actual secret value at runtime

Deploy the app:

```bash
python vllm_with_auth.py
```

Or use the Flyte CLI:

```bash
flyte serve vllm_with_auth.py
```

### Deploy SGLang app with authentication

Here's how to deploy a SGLang app with API key authentication:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-sglang>=2.0.0b45",
# ]
# ///

"""SGLang app with API key authentication."""

import pathlib
from flyteplugins.sglang import SGLangAppEnvironment
import flyte

# The secret must be created using: flyte create secret AUTH_SECRET <your-api-key-value>
sglang_app = SGLangAppEnvironment(
    name="sglang-with-auth",
    model_hf_path="Qwen/Qwen3-0.6B",  # HuggingFace model path
    model_id="qwen3-0.6b",  # Model ID exposed by SGLang
    resources=flyte.Resources(
        cpu="4",
        memory="16Gi",
        gpu="L40s:1",  # GPU required for LLM serving
        disk="10Gi",
    ),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),
        scaledown_after=300,  # Scale down after 5 minutes of inactivity
    ),
    # Disable Union's platform-level authentication so you can access the
    # endpoint from the public internet
    requires_auth=False,
    # Inject the secret as an environment variable
    secrets=flyte.Secret(key="AUTH_SECRET", as_env_var="AUTH_SECRET"),
    # Pass the API key to SGLang's --api-key argument
    # The $AUTH_SECRET will be replaced with the actual secret value at runtime
    extra_args=[
        "--api-key", "$AUTH_SECRET",
    ],
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app = flyte.serve(sglang_app)
    print(f"Deployed SGLang app: {app.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/sglang/sglang_with_auth.py*

The configuration is similar to vLLM:

1. **`requires_auth=False`** - Disables Union's platform-level authentication
2. **`secrets=flyte.Secret(key="AUTH_SECRET", as_env_var="AUTH_SECRET")`** - Injects the secret as an environment variable
3. **`extra_args=["--api-key", "$AUTH_SECRET"]`** - Passes the API key to SGLang's `--api-key` argument

Deploy the app:

```bash
python sglang_with_auth.py
```

Or use the Flyte CLI:

```bash
flyte serve sglang_with_auth.py
```

### Invoke authenticated LLM endpoints

Once deployed, you can invoke the authenticated endpoints using the OpenAI-compatible API format. Both vLLM and SGLang expose OpenAI-compatible endpoints.

For example, to make a chat completion request:

```bash
curl -X POST "https://your-app-url/v1/chat/completions" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer <your-api-key-value>" \
  -d '{
    "model": "qwen3-0.6b",
    "messages": [
      {"role": "user", "content": "Hello, how are you?"}
    ]
  }'
```

Replace `<your-api-key-value>` with the actual API key value you used when creating the secret.

For example, if you created the secret with value `my-llm-api-key-12345`:

```bash
curl -X POST "https://your-app-url/v1/chat/completions" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer my-llm-api-key-12345" \
  -d '{
    "model": "qwen3-0.6b",
    "messages": [
      {"role": "user", "content": "Hello, how are you?"}
    ]
  }'
```

You should receive a response with the model's completion.

> [!NOTE]
> The `$AUTH_SECRET` syntax in `extra_args` is automatically replaced with the actual secret value at runtime. This ensures the API key is never exposed in your code or configuration files.

## Accessing Swagger documentation

The app also includes a public health check endpoint and Swagger UI documentation:

- **Health check**: `https://your-app-url/health`
- **Swagger UI**: `https://your-app-url/docs`
- **ReDoc**: `https://your-app-url/redoc`

The Swagger UI will show an "Authorize" button where you can enter your Bearer token to test authenticated endpoints directly from the browser.

## Security best practices

1. **Use strong API keys**: Generate cryptographically secure random strings for your API keys
2. **Rotate keys regularly**: Periodically rotate your API keys for better security
3. **Scope secrets appropriately**: Use project/domain scoping when creating secrets if you want to limit access:
   ```bash
   flyte create secret --project my-project --domain development API_KEY my-secret-value
   ```
4. **Never commit secrets**: Always use Flyte secrets for API keys, never hardcode them in your code
5. **Use HTTPS**: Always use HTTPS in production (Flyte apps are served over HTTPS by default)

## Troubleshooting

**Authentication failing:**
- Verify the secret exists: `flyte get secret API_KEY`
- Check that the secret key name matches exactly (case-sensitive)
- Ensure you're using the correct Bearer token value
- Verify the `as_env_var` parameter matches the environment variable name in your code

**Secret not found:**
- Make sure you've created the secret before deploying the app
- Check the secret scope (organization vs project/domain) matches your app's project/domain
- Verify the secret name matches exactly (should be `API_KEY`)

**App not starting:**
- Check container logs for errors
- Verify all dependencies are installed in the image
- Ensure the secret is accessible in the app's project/domain

**LLM app authentication not working:**
- Verify the secret exists: `flyte get secret AUTH_SECRET`
- Check that `$AUTH_SECRET` is correctly specified in `extra_args` (note the `$` prefix)
- Ensure the secret name matches exactly (case-sensitive) in both the `flyte.Secret()` call and `extra_args`
- For vLLM, verify the `--api-key` argument is correctly passed
- For SGLang, verify the `--api-key` argument is correctly passed
- Check that `requires_auth=False` is set to allow public access

## Next steps

- Learn more about [managing secrets](https://www.union.ai/docs/v2/union/user-guide/task-configuration/secrets) in Flyte
- See [app usage patterns](./app-usage-patterns#call-task-from-app-webhooks--apis) for webhook examples and authentication patterns
- Learn about [vLLM apps](./vllm-app) and [SGLang apps](./sglang-app) for serving LLMs

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/streamlit-app ===

# Streamlit app

Streamlit is a popular framework for building interactive web applications and dashboards. Flyte makes it easy to deploy Streamlit apps as long-running services.

## Basic Streamlit app

The simplest way to deploy a Streamlit app is to use the built-in Streamlit "hello" demo:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# ///

"""A basic Streamlit app using the built-in hello demo."""

# {{docs-fragment app-definition}}
import flyte
import flyte.app

image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("streamlit==1.41.1")

app_env = flyte.app.AppEnvironment(
    name="streamlit-hello",
    image=image,
    args="streamlit hello --server.port 8080",
    port=8080,
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config()
    app = flyte.deploy(app_env)
    print(f"Deployed app: {app[0].summary_repr()}")
# {{/docs-fragment app-definition}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/basic_streamlit.py*

This just serves the built-in Streamlit "hello" demo.

## Single-file Streamlit app

For a single-file Streamlit app, you can wrap the app code in a function and use the `args` parameter to specify the command to run the app.
Note that the command is running the file itself, and uses the `--server` flag to start the server.

This is useful when you have a relatively small and simple app that you want to deploy as a single file.

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "streamlit",
# ]
# ///

"""A single-script Streamlit app example."""

import sys
from pathlib import Path

import streamlit as st

import flyte
import flyte.app

# {{docs-fragment streamlit-app}}
def main():
    st.set_page_config(page_title="Simple Streamlit App", page_icon="🚀")

    st.title("Hello from Streamlit!")
    st.write("This is a simple single-script Streamlit app.")

    name = st.text_input("What's your name?", "World")
    st.write(f"Hello, {name}!")

    if st.button("Click me!"):
        st.balloons()
        st.success("Button clicked!")
# {{/docs-fragment streamlit-app}}

file_name = Path(__file__).name
# {{docs-fragment app-env}}
app_env = flyte.app.AppEnvironment(
    name="streamlit-single-script",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("streamlit==1.41.1"),
    args=[
        "streamlit",
        "run",
        file_name,
        "--server.port",
        "8080",
        "--",
        "--server",
    ],
    port=8080,
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    import logging
    import sys

    if "--server" in sys.argv:
        main()
    else:
        flyte.init_from_config(
            root_dir=Path(__file__).parent,
            log_level=logging.DEBUG,
        )
        app = flyte.serve(app_env)
        print(f"App URL: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/single_file_streamlit.py*

Note that the `if __name__ == "__main__"` block is used to both serve the `AppEnvironment` *and* run the app code via
the `streamlit run` command using the `--server` flag.

## Multi-file Streamlit app

When your streamlit application grows more complex, you may want to split your app into multiple files.
For a multi-file Streamlit app, use the `include` parameter to bundle your app files:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
# ]
# ///

"""A custom Streamlit app with multiple files."""

import pathlib
import flyte
import flyte.app

# {{docs-fragment app-env}}
image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
    "streamlit==1.41.1",
    "pandas==2.2.3",
    "numpy==2.2.3",
)

app_env = flyte.app.AppEnvironment(
    name="streamlit-multi-file-app",
    image=image,
    args="streamlit run main.py --server.port 8080",
    port=8080,
    include=["main.py", "utils.py"],  # Include your app files
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app = flyte.deploy(app_env)
    print(f"Deployed app: {app[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/multi_file_streamlit.py*

Where your project structure looks like this:

```
project/
├── main.py           # Main Streamlit app
├── utils.py          # Utility functions
└── components.py     # Reusable components
```

Your `main.py` file would contain your Streamlit app code:

```
import os

import streamlit as st
from utils import generate_data

# {{docs-fragment streamlit-app}}
all_columns = ["Apples", "Orange", "Pineapple"]
with st.container(border=True):
    columns = st.multiselect("Columns", all_columns, default=all_columns)

all_data = st.cache_data(generate_data)(columns=all_columns, seed=101)

data = all_data[columns]

tab1, tab2 = st.tabs(["Chart", "Dataframe"])
tab1.line_chart(data, height=250)
tab2.dataframe(data, height=250, use_container_width=True)
st.write(f"Environment: {os.environ}")
# {{/docs-fragment streamlit-app}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/main.py*

## Example: Data visualization dashboard

Here's a complete example of a Streamlit dashboard, all in a single file.

Define the streamlit app in the `main` function:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "streamlit",
#    "pandas",
#    "numpy",
# ]
# ///

"""A data visualization dashboard example using Streamlit."""

import sys
from pathlib import Path

import numpy as np
import pandas as pd
import streamlit as st

import flyte
import flyte.app

# {{docs-fragment streamlit-app}}
def main():
    st.set_page_config(page_title="Sales Dashboard", page_icon="📊")

    st.title("Sales Dashboard")

    # Load data
    @st.cache_data
    def load_data():
        return pd.DataFrame({
            "date": pd.date_range("2024-01-01", periods=100, freq="D"),
            "sales": np.random.randint(1000, 5000, 100),
        })

    data = load_data()

    # Sidebar filters
    st.sidebar.header("Filters")
    start_date = st.sidebar.date_input("Start date", value=data["date"].min())
    end_date = st.sidebar.date_input("End date", value=data["date"].max())

    # Filter data
    filtered_data = data[
        (data["date"] >= pd.Timestamp(start_date)) &
        (data["date"] <= pd.Timestamp(end_date))
    ]

    # Display metrics
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("Total Sales", f"${filtered_data['sales'].sum():,.0f}")
    with col2:
        st.metric("Average Sales", f"${filtered_data['sales'].mean():,.0f}")
    with col3:
        st.metric("Days", len(filtered_data))

    # Chart
    st.line_chart(filtered_data.set_index("date")["sales"])

# {{/docs-fragment streamlit-app}}

# {{docs-fragment app-env}}
file_name = Path(__file__).name
app_env = flyte.app.AppEnvironment(
    name="sales-dashboard",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit==1.41.1",
        "pandas==2.2.3",
        "numpy==2.2.3",
    ),
    args=["streamlit run", file_name, "--server.port", "8080", "--", "--server"],
    port=8080,
    resources=flyte.Resources(cpu="2", memory="2Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment serve}}
if __name__ == "__main__":
    import logging
    import sys

    if "--server" in sys.argv:
        main()
    else:
        flyte.init_from_config(
            root_dir=Path(__file__).parent,
            log_level=logging.DEBUG,
        )
        app = flyte.serve(app_env)
        print(f"Dashboard URL: {app.url}")
# {{/docs-fragment serve}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/data_visualization_dashboard.py*

Define the `AppEnvironment` to serve the app:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "streamlit",
#    "pandas",
#    "numpy",
# ]
# ///

"""A data visualization dashboard example using Streamlit."""

import sys
from pathlib import Path

import numpy as np
import pandas as pd
import streamlit as st

import flyte
import flyte.app

# {{docs-fragment streamlit-app}}
def main():
    st.set_page_config(page_title="Sales Dashboard", page_icon="📊")

    st.title("Sales Dashboard")

    # Load data
    @st.cache_data
    def load_data():
        return pd.DataFrame({
            "date": pd.date_range("2024-01-01", periods=100, freq="D"),
            "sales": np.random.randint(1000, 5000, 100),
        })

    data = load_data()

    # Sidebar filters
    st.sidebar.header("Filters")
    start_date = st.sidebar.date_input("Start date", value=data["date"].min())
    end_date = st.sidebar.date_input("End date", value=data["date"].max())

    # Filter data
    filtered_data = data[
        (data["date"] >= pd.Timestamp(start_date)) &
        (data["date"] <= pd.Timestamp(end_date))
    ]

    # Display metrics
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("Total Sales", f"${filtered_data['sales'].sum():,.0f}")
    with col2:
        st.metric("Average Sales", f"${filtered_data['sales'].mean():,.0f}")
    with col3:
        st.metric("Days", len(filtered_data))

    # Chart
    st.line_chart(filtered_data.set_index("date")["sales"])

# {{/docs-fragment streamlit-app}}

# {{docs-fragment app-env}}
file_name = Path(__file__).name
app_env = flyte.app.AppEnvironment(
    name="sales-dashboard",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit==1.41.1",
        "pandas==2.2.3",
        "numpy==2.2.3",
    ),
    args=["streamlit run", file_name, "--server.port", "8080", "--", "--server"],
    port=8080,
    resources=flyte.Resources(cpu="2", memory="2Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment serve}}
if __name__ == "__main__":
    import logging
    import sys

    if "--server" in sys.argv:
        main()
    else:
        flyte.init_from_config(
            root_dir=Path(__file__).parent,
            log_level=logging.DEBUG,
        )
        app = flyte.serve(app_env)
        print(f"Dashboard URL: {app.url}")
# {{/docs-fragment serve}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/data_visualization_dashboard.py*

And finally the app serving logic:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "streamlit",
#    "pandas",
#    "numpy",
# ]
# ///

"""A data visualization dashboard example using Streamlit."""

import sys
from pathlib import Path

import numpy as np
import pandas as pd
import streamlit as st

import flyte
import flyte.app

# {{docs-fragment streamlit-app}}
def main():
    st.set_page_config(page_title="Sales Dashboard", page_icon="📊")

    st.title("Sales Dashboard")

    # Load data
    @st.cache_data
    def load_data():
        return pd.DataFrame({
            "date": pd.date_range("2024-01-01", periods=100, freq="D"),
            "sales": np.random.randint(1000, 5000, 100),
        })

    data = load_data()

    # Sidebar filters
    st.sidebar.header("Filters")
    start_date = st.sidebar.date_input("Start date", value=data["date"].min())
    end_date = st.sidebar.date_input("End date", value=data["date"].max())

    # Filter data
    filtered_data = data[
        (data["date"] >= pd.Timestamp(start_date)) &
        (data["date"] <= pd.Timestamp(end_date))
    ]

    # Display metrics
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("Total Sales", f"${filtered_data['sales'].sum():,.0f}")
    with col2:
        st.metric("Average Sales", f"${filtered_data['sales'].mean():,.0f}")
    with col3:
        st.metric("Days", len(filtered_data))

    # Chart
    st.line_chart(filtered_data.set_index("date")["sales"])

# {{/docs-fragment streamlit-app}}

# {{docs-fragment app-env}}
file_name = Path(__file__).name
app_env = flyte.app.AppEnvironment(
    name="sales-dashboard",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit==1.41.1",
        "pandas==2.2.3",
        "numpy==2.2.3",
    ),
    args=["streamlit run", file_name, "--server.port", "8080", "--", "--server"],
    port=8080,
    resources=flyte.Resources(cpu="2", memory="2Gi"),
    requires_auth=False,
)
# {{/docs-fragment app-env}}

# {{docs-fragment serve}}
if __name__ == "__main__":
    import logging
    import sys

    if "--server" in sys.argv:
        main()
    else:
        flyte.init_from_config(
            root_dir=Path(__file__).parent,
            log_level=logging.DEBUG,
        )
        app = flyte.serve(app_env)
        print(f"Dashboard URL: {app.url}")
# {{/docs-fragment serve}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/streamlit/data_visualization_dashboard.py*

## Best practices

1. **Use `include` for custom apps**: Always include your app files when deploying custom Streamlit code
2. **Set the port correctly**: Ensure your Streamlit app uses `--server.port 8080` (or match your `port` setting)
3. **Cache data**: Use `@st.cache_data` for expensive computations to improve performance
4. **Resource sizing**: Adjust resources based on your app's needs (data size, computations)
5. **Public vs private**: Set `requires_auth=False` for public dashboards, `True` for internal tools

## Troubleshooting

**App not loading:**
- Verify the port matches (use `--server.port 8080`)
- Check that all required files are included
- Review container logs for errors

**Missing dependencies:**
- Ensure all required packages are in your image's pip packages
- Check that file paths in `include` are correct

**Performance issues:**
- Increase CPU/memory resources
- Use Streamlit's caching features (`@st.cache_data`, `@st.cache_resource`)
- Optimize data processing

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/fastapi-app ===

# FastAPI app

FastAPI is a modern, fast web framework for building APIs. Flyte provides `FastAPIAppEnvironment` which makes it easy to deploy FastAPI applications.

## Basic FastAPI app

Here's a simple FastAPI app:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A basic FastAPI app example."""

from fastapi import FastAPI
import pathlib
import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment fastapi-app}}
app = FastAPI(
    title="My API",
    description="A simple FastAPI application",
    version="1.0.0",
)
# {{/docs-fragment fastapi-app}}

# {{docs-fragment fastapi-env}}
env = FastAPIAppEnvironment(
    name="my-fastapi-app",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment fastapi-env}}

# {{docs-fragment endpoints}}
@app.get("/")
async def root():
    return {"message": "Hello, World!"}

@app.get("/health")
async def health_check():
    return {"status": "healthy"}
# {{/docs-fragment endpoints}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/basic_fastapi.py*

Once deployed, you can:
- Access the API at the generated URL
- View interactive API docs at `/docs` (Swagger UI)
- View alternative docs at `/redoc`

## Serving a machine learning model

Here's an example of serving a scikit-learn model:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "scikit-learn",
#    "joblib",
# ]
# ///

"""Example of serving a machine learning model with FastAPI."""

import os
from contextlib import asynccontextmanager
from pathlib import Path

import joblib
import flyte
from fastapi import FastAPI
from flyte.app.extras import FastAPIAppEnvironment
from pydantic import BaseModel

# {{docs-fragment ml-model}}
app = FastAPI(title="ML Model API")

# Define request/response models
class PredictionRequest(BaseModel):
    feature1: float
    feature2: float
    feature3: float

class PredictionResponse(BaseModel):
    prediction: float
    probability: float

# Load model (you would typically load this from storage)
model = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global model
    model_path = os.getenv("MODEL_PATH", "/app/models/model.joblib")
    # In production, load from your storage
    if os.path.exists(model_path):
        with open(model_path, "rb") as f:
            model = joblib.load(f)
    yield

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    # Make prediction
    # prediction = model.predict([[request.feature1, request.feature2, request.feature3]])

    # Dummy prediction for demo
    prediction = 0.85
    probability = 0.92

    return PredictionResponse(
        prediction=prediction,
        probability=probability,
    )

env = FastAPIAppEnvironment(
    name="ml-model-api",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
        "scikit-learn",
        "pydantic",
        "joblib",
    ),
    parameters=[
        flyte.app.Parameter(
            name="model_file",
            value=flyte.io.File("s3://bucket/models/model.joblib"),
            mount="/app/models",
            env_var="MODEL_PATH",
        ),
    ],
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    requires_auth=False,
)
# {{/docs-fragment ml-model}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"API URL: {app_deployment[0].url}")
    print(f"Swagger docs: {app_deployment[0].url}/docs")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/ml_model_serving.py*

## Accessing Swagger documentation

FastAPI automatically generates interactive API documentation. Once deployed:

- **Swagger UI**: Access at `{app_url}/docs`
- **ReDoc**: Access at `{app_url}/redoc`
- **OpenAPI JSON**: Access at `{app_url}/openapi.json`

The Swagger UI provides an interactive interface where you can:
- See all available endpoints
- Test API calls directly from the browser
- View request/response schemas
- See example payloads

## Example: REST API with multiple endpoints

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Example REST API with multiple endpoints."""

from pathlib import Path
from typing import List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment rest-api}}
app = FastAPI(title="Product API")

# Data models
class Product(BaseModel):
    id: int
    name: str
    price: float

class ProductCreate(BaseModel):
    name: str
    price: float

# In-memory database (use real database in production)
products_db = []

@app.get("/products", response_model=List[Product])
async def get_products():
    return products_db

@app.get("/products/{product_id}", response_model=Product)
async def get_product(product_id: int):
    product = next((p for p in products_db if p["id"] == product_id), None)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    return product

@app.post("/products", response_model=Product)
async def create_product(product: ProductCreate):
    new_product = {
        "id": len(products_db) + 1,
        "name": product.name,
        "price": product.price,
    }
    products_db.append(new_product)
    return new_product

env = FastAPIAppEnvironment(
    name="product-api",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment rest-api}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"API URL: {app_deployment[0].url}")
    print(f"Swagger docs: {app_deployment[0].url}/docs")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/rest_api.py*

## Multi-file FastAPI app

Here's an example of a multi-file FastAPI app:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""Multi-file FastAPI app example."""

from fastapi import FastAPI
from module import function  # Import from another file
import pathlib

import flyte
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment app-definition}}
app = FastAPI(title="Multi-file FastAPI Demo")

app_env = FastAPIAppEnvironment(
    name="fastapi-multi-file",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    # FastAPIAppEnvironment automatically includes necessary files
    # But you can also specify explicitly:
    # include=["app.py", "module.py"],
)
# {{/docs-fragment app-definition}}

# {{docs-fragment endpoint}}
@app.get("/")
async def root():
    return function()  # Uses function from module.py
# {{/docs-fragment endpoint}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(app_env)
    print(f"Deployed: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/multi_file/app.py*

The helper module:

```
# {{docs-fragment helper-function}}
def function():
    """Helper function used by the FastAPI app."""
    return {"message": "Hello from module.py!"}
# {{/docs-fragment helper-function}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/multi_file/module.py*

See [Multi-script apps](./multi-script-apps) for more details on building FastAPI apps with multiple files.

## Local-to-remote model serving

A common ML pattern: train a model with a Flyte pipeline, then serve predictions from it. During local development, the app loads the model from a local file (e.g. `model.pt` saved by your training pipeline). When deployed remotely, Flyte's `Parameter` system automatically resolves the model from the latest training run output.

```python
from contextlib import asynccontextmanager
from pathlib import Path
import os

from fastapi import FastAPI
import flyte
from flyte.app import Parameter, RunOutput
from flyte.app.extras import FastAPIAppEnvironment

MODEL_PATH_ENV = "MODEL_PATH"

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load model on startup, either local file or remote run output."""
    model_path = Path(os.environ.get(MODEL_PATH_ENV, "model.pt"))
    model = load_model(model_path)
    app.state.model = model
    yield

app = FastAPI(title="MNIST Predictor", lifespan=lifespan)

serving_env = FastAPIAppEnvironment(
    name="mnist-predictor",
    app=app,
    parameters=[
        # Remote: resolves model from the latest train run and sets MODEL_PATH
        Parameter(
            name="model",
            value=RunOutput(task_name="ml_pipeline.pipeline", type="file", getter=(1,)),
            download=True,
            env_var=MODEL_PATH_ENV,
        ),
    ],
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi", "uvicorn", "torch", "torchvision",
    ),
    resources=flyte.Resources(cpu=1, memory="4Gi"),
)

@app.get("/predict")
async def predict(index: int = 0) -> dict:
    return {"prediction": app.state.model(index)}

if __name__ == "__main__":
    # Local: skip RunOutput resolution, lifespan falls back to local model.pt
    serving_env.parameters = []
    local_app = flyte.with_servecontext(mode="local").serve(serving_env)
    local_app.activate(wait=True)
```

Locally, the app loads `model.pt` from disk:

```bash
python serve_model.py
```

Remotely, Flyte resolves the model from the latest training run:

```bash
flyte deploy serve_model.py serving_env
```

The key idea: `Parameter` with `RunOutput` bridges the gap between local and remote. Locally, the app falls back to a local file. Remotely, Flyte resolves the model artifact from the latest pipeline run automatically.

## Best practices

1. **Use Pydantic models**: Define request/response models for type safety and automatic validation
2. **Handle errors**: Use HTTPException for proper error responses
3. **Async operations**: Use async/await for I/O operations
4. **Environment variables**: Use environment variables for configuration
5. **Logging**: Add proper logging for debugging and monitoring
6. **Health checks**: Always include a `/health` endpoint
7. **API documentation**: FastAPI auto-generates docs, but add descriptions to your endpoints

## Advanced features

FastAPI supports many features that work with Flyte:

- **Dependencies**: Use FastAPI's dependency injection system
- **Background tasks**: Run background tasks with BackgroundTasks
- **WebSockets**: See [WebSocket-based patterns](./app-usage-patterns#websocket-based-patterns) for details
- **Authentication**: Add authentication middleware (see [secret-based authentication](./secret-based-authentication))
- **CORS**: Configure CORS for cross-origin requests
- **Rate limiting**: Add rate limiting middleware

## Troubleshooting

**App not starting:**
- Check that uvicorn can find your app module
- Verify all dependencies are installed in the image
- Check container logs for startup errors

**Import errors:**
- Ensure all imported modules are available
- Use `include` parameter if you have custom modules
- Check that file paths are correct

**API not accessible:**
- Verify `requires_auth` setting
- Check that the app is listening on the correct port (8080)
- Review network/firewall settings

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/vllm-app ===

# vLLM app

vLLM is a high-performance library for serving large language models (LLMs). Flyte provides `VLLMAppEnvironment` for deploying vLLM model servers.

## Installation

First, install the vLLM plugin:

```bash
pip install flyteplugins-vllm
```

## Basic vLLM app

Here's a simple example serving a HuggingFace model:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-vllm>=2.0.0b45",
# ]
# ///

"""A simple vLLM app example."""

from flyteplugins.vllm import VLLMAppEnvironment
import flyte

# {{docs-fragment basic-vllm-app}}
vllm_app = VLLMAppEnvironment(
    name="my-llm-app",
    model_hf_path="Qwen/Qwen3-0.6B",  # HuggingFace model path
    model_id="qwen3-0.6b",  # Model ID exposed by vLLM
    resources=flyte.Resources(
        cpu="4",
        memory="16Gi",
        gpu="L40s:1",  # GPU required for LLM serving
        disk="10Gi",
    ),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),
        scaledown_after=300,  # Scale down after 5 minutes of inactivity
    ),
    requires_auth=False,
)
# {{/docs-fragment basic-vllm-app}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config()
    app = flyte.serve(vllm_app)
    print(f"Deployed vLLM app: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/vllm/basic_vllm.py*

## Using prefetched models

You can use models prefetched with `flyte.prefetch`:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-vllm>=2.0.0b45",
# ]
# override-dependencies = [
#    "cel-python; sys_platform == 'never'",
# ]
# ///

"""vLLM app using prefetched models."""

from flyteplugins.vllm import VLLMAppEnvironment
import flyte

# {{docs-fragment prefetch}}

# Use the prefetched model
vllm_app = VLLMAppEnvironment(
    name="my-llm-app",
    model_hf_path="Qwen/Qwen3-0.6B",  # this is a placeholder
    model_id="qwen3-0.6b",
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1", disk="10Gi"),
    stream_model=True,  # Stream model directly from blob store to GPU
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config()

    # Prefetch the model first
    run = flyte.prefetch.hf_model(repo="Qwen/Qwen3-0.6B")
    run.wait()

    # Use the prefetched model
    app = flyte.serve(
        vllm_app.clone_with(
            vllm_app.name,
            model_hf_path=None,
            model_path=flyte.app.RunOutput(type="directory", run_name=run.name),
        )
    )
    print(f"Deployed vLLM app: {app.url}")
# {{/docs-fragment prefetch}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/vllm/vllm_with_prefetch.py*

## Model streaming

`VLLMAppEnvironment` supports streaming models directly from blob storage to GPU memory, reducing startup time.
When `stream_model=True` and the `model_path` argument is provided with either a `flyte.io.Dir` or `RunOutput` pointing
to a path in object store:

- Model weights stream directly from storage to GPU
- Faster startup time (no full download required)
- Lower disk space requirements

> [!NOTE]
> The contents of the model directory must be compatible with the vLLM-supported formats, e.g. the HuggingFace model
> serialization format.

## Custom vLLM arguments

Use `extra_args` to pass additional arguments to vLLM:

```python
vllm_app = VLLMAppEnvironment(
    name="custom-vllm-app",
    model_hf_path="Qwen/Qwen3-0.6B",
    model_id="qwen3-0.6b",
    extra_args=[
        "--max-model-len", "8192",  # Maximum context length
        "--gpu-memory-utilization", "0.8",  # GPU memory utilization
        "--trust-remote-code",  # Trust remote code in models
    ],
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1"),
    # ...
)
```

See the [vLLM documentation](https://docs.vllm.ai/en/stable/configuration/engine_args.html) for all available arguments.

## Using the OpenAI-compatible API

Once deployed, your vLLM app exposes an OpenAI-compatible API:

```python
from openai import OpenAI

client = OpenAI(
    base_url="https://your-app-url/v1",  # vLLM endpoint
    api_key="your-api-key",  # If you passed an --api-key argument
)

response = client.chat.completions.create(
    model="qwen3-0.6b",  # Your model_id
    messages=[
        {"role": "user", "content": "Hello, how are you?"}
    ],
)

print(response.choices[0].message.content)
```

> [!TIP]
> If you passed an `--api-key` argument, you can use the `api_key` parameter to authenticate your requests.
> See [here](./secret-based-authentication#deploy-vllm-app-with-authentication) for more details on how to pass auth secrets to your app.

## Multi-GPU inference (Tensor Parallelism)

For larger models, use multiple GPUs with tensor parallelism:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-vllm>=2.0.0b45",
# ]
# ///

"""vLLM app with multi-GPU tensor parallelism."""

from flyteplugins.vllm import VLLMAppEnvironment
import flyte

# {{docs-fragment multi-gpu}}
vllm_app = VLLMAppEnvironment(
    name="multi-gpu-llm-app",
    model_hf_path="meta-llama/Llama-2-70b-hf",
    model_id="llama-2-70b",
    resources=flyte.Resources(
        cpu="8",
        memory="32Gi",
        gpu="L40s:4",  # 4 GPUs for tensor parallelism
        disk="100Gi",
    ),
    extra_args=[
        "--tensor-parallel-size", "4",  # Use 4 GPUs
        "--max-model-len", "4096",
        "--gpu-memory-utilization", "0.9",
    ],
    requires_auth=False,
)
# {{/docs-fragment multi-gpu}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config()
    app = flyte.serve(vllm_app)
    print(f"Deployed vLLM app: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/vllm/vllm_multi_gpu.py*

The `tensor-parallel-size` should match the number of GPUs specified in resources.

## Model sharding with prefetch

You can prefetch and shard models for multi-GPU inference:

```python
# Prefetch with sharding configuration
run = flyte.prefetch.hf_model(
    repo="meta-llama/Llama-2-70b-hf",
    accelerator="L40s:4",
    shard_config=flyte.prefetch.ShardConfig(
        engine="vllm",
        args=flyte.prefetch.VLLMShardArgs(
            tensor_parallel_size=4,
            dtype="auto",
            trust_remote_code=True,
        ),
    ),
)
run.wait()

# Use the sharded model
vllm_app = VLLMAppEnvironment(
    name="sharded-llm-app",
    model_path=flyte.app.RunOutput(type="directory", run_name=run.name),
    model_id="llama-2-70b",
    resources=flyte.Resources(cpu="8", memory="32Gi", gpu="L40s:4", disk="100Gi"),
    extra_args=["--tensor-parallel-size", "4"],
    stream_model=True,
)
```

See [Prefetching models](https://www.union.ai/docs/v2/union/user-guide/serve-and-deploy-apps/prefetching-models) for more details on sharding.

## Autoscaling

vLLM apps work well with autoscaling:

```python
vllm_app = VLLMAppEnvironment(
    name="autoscaling-llm-app",
    model_hf_path="Qwen/Qwen3-0.6B",
    model_id="qwen3-0.6b",
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1"),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),  # Scale to zero when idle
        scaledown_after=600,  # 10 minutes idle before scaling down
    ),
    # ...
)
```

## Best practices

1. **Use prefetching**: Prefetch models for faster deployment and better reproducibility
2. **Enable streaming**: Use `stream_model=True` to reduce startup time and disk usage
3. **Right-size GPUs**: Match GPU memory to model size
4. **Configure memory utilization**: Use `--gpu-memory-utilization` to control memory usage
5. **Use tensor parallelism**: For large models, use multiple GPUs with `tensor-parallel-size`
6. **Set autoscaling**: Use appropriate idle TTL to balance cost and performance
7. **Limit context length**: Use `--max-model-len` for smaller models to reduce memory usage

## Troubleshooting

**Model loading fails:**
- Verify GPU memory is sufficient for the model
- Check that the model path or HuggingFace path is correct
- Review container logs for detailed error messages

**Out of memory errors:**
- Reduce `--max-model-len`
- Lower `--gpu-memory-utilization`
- Use a smaller model or more GPUs

**Slow startup:**
- Enable `stream_model=True` for faster loading
- Prefetch models before deployment
- Use faster storage backends

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-apps/sglang-app ===

# SGLang app

SGLang is a fast structured generation library for large language models (LLMs). Flyte provides `SGLangAppEnvironment` for deploying SGLang model servers.

## Installation

First, install the SGLang plugin:

```bash
pip install flyteplugins-sglang
```

## Basic SGLang app

Here's a simple example serving a HuggingFace model:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-sglang>=2.0.0b45",
# ]
# ///

"""A simple SGLang app example."""

from flyteplugins.sglang import SGLangAppEnvironment
import flyte

# {{docs-fragment basic-sglang-app}}
sglang_app = SGLangAppEnvironment(
    name="my-sglang-app",
    model_hf_path="Qwen/Qwen3-0.6B",  # HuggingFace model path
    model_id="qwen3-0.6b",  # Model ID exposed by SGLang
    resources=flyte.Resources(
        cpu="4",
        memory="16Gi",
        gpu="L40s:1",  # GPU required for LLM serving
        disk="10Gi",
    ),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),
        scaledown_after=300,  # Scale down after 5 minutes of inactivity
    ),
    requires_auth=False,
)
# {{/docs-fragment basic-sglang-app}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config()
    app = flyte.serve(sglang_app)
    print(f"Deployed SGLang app: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/sglang/basic_sglang.py*

## Using prefetched models

You can use models prefetched with `flyte.prefetch`:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-sglang>=2.0.0b45",
# ]
# ///

"""SGLang app using prefetched models."""

from flyteplugins.sglang import SGLangAppEnvironment
import flyte

# {{docs-fragment prefetch}}

# Use the prefetched model
sglang_app = SGLangAppEnvironment(
    name="my-sglang-app",
    model_hf_path="Qwen/Qwen3-0.6B",  # this is a placeholder
    model_id="qwen3-0.6b",
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1", disk="10Gi"),
    stream_model=True,  # Stream model directly from blob store to GPU
    requires_auth=False,
)

if __name__ == "__main__":
    flyte.init_from_config()

    # Prefetch the model first
    run = flyte.prefetch.hf_model(repo="Qwen/Qwen3-0.6B")
    run.wait()

    app = flyte.serve(
        sglang_app.clone_with(
            sglang_app.name,
            model_hf_path=None,
            model_path=flyte.app.RunOutput(type="directory", run_name=run.name),
        )
    )
    print(f"Deployed SGLang app: {app.url}")
# {{/docs-fragment prefetch}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/sglang/sglang_with_prefetch.py*

## Model streaming

`SGLangAppEnvironment` supports streaming models directly from blob storage to GPU memory, reducing startup time.
When `stream_model=True` and the `model_path` argument is provided with either a `flyte.io.Dir` or `RunOutput` pointing
to a path in object store:

- Model weights stream directly from storage to GPU
- Faster startup time (no full download required)
- Lower disk space requirements

> [!NOTE]
> The contents of the model directory must be compatible with the SGLang-supported formats, e.g. the HuggingFace model
> serialization format.

## Custom SGLang arguments

Use `extra_args` to pass additional arguments to SGLang:

```python
sglang_app = SGLangAppEnvironment(
    name="custom-sglang-app",
    model_hf_path="Qwen/Qwen3-0.6B",
    model_id="qwen3-0.6b",
    extra_args=[
        "--max-model-len", "8192",  # Maximum context length
        "--mem-fraction-static", "0.8",  # Memory fraction for static allocation
        "--trust-remote-code",  # Trust remote code in models
    ],
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1"),
    # ...
)
```

See the [SGLang server arguments documentation](https://docs.sglang.io/advanced_features/server_arguments.html) for all available options.

## Using the OpenAI-compatible API

Once deployed, your SGLang app exposes an OpenAI-compatible API:

```python
from openai import OpenAI

client = OpenAI(
    base_url="https://your-app-url/v1",  # SGLang endpoint
    api_key="your-api-key",  # If you passed an --api-key argument
)

response = client.chat.completions.create(
    model="qwen3-0.6b",  # Your model_id
    messages=[
        {"role": "user", "content": "Hello, how are you?"}
    ],
)

print(response.choices[0].message.content)
```

> [!TIP]
> If you passed an `--api-key` argument, you can use the `api_key` parameter to authenticate your requests.
> See [here](./secret-based-authentication#deploy-sglang-app-with-authentication) for more details on how to pass auth secrets to your app.

## Multi-GPU inference (Tensor Parallelism)

For larger models, use multiple GPUs with tensor parallelism:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "flyteplugins-sglang>=2.0.0b45",
# ]
# ///

"""SGLang app with multi-GPU tensor parallelism."""

from flyteplugins.sglang import SGLangAppEnvironment
import flyte

# {{docs-fragment multi-gpu}}
sglang_app = SGLangAppEnvironment(
    name="multi-gpu-sglang-app",
    model_hf_path="meta-llama/Llama-2-70b-hf",
    model_id="llama-2-70b",
    resources=flyte.Resources(
        cpu="8",
        memory="32Gi",
        gpu="L40s:4",  # 4 GPUs for tensor parallelism
        disk="100Gi",
    ),
    extra_args=[
        "--tp", "4",  # Tensor parallelism size (4 GPUs)
        "--max-model-len", "4096",
        "--mem-fraction-static", "0.9",
    ],
    requires_auth=False,
)
# {{/docs-fragment multi-gpu}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config()
    app = flyte.serve(sglang_app)
    print(f"Deployed SGLang app: {app.url}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/sglang/sglang_multi_gpu.py*

The tensor parallelism size (`--tp`) should match the number of GPUs specified in resources.

## Model sharding with prefetch

You can prefetch and shard models for multi-GPU inference using SGLang's sharding:

```python
# Prefetch with sharding configuration
run = flyte.prefetch.hf_model(
    repo="meta-llama/Llama-2-70b-hf",
    accelerator="L40s:4",
    shard_config=flyte.prefetch.ShardConfig(
        engine="vllm",
        args=flyte.prefetch.VLLMShardArgs(
            tensor_parallel_size=4,
            dtype="auto",
            trust_remote_code=True,
        ),
    ),
)
run.wait()

# Use the sharded model
sglang_app = SGLangAppEnvironment(
    name="sharded-sglang-app",
    model_path=flyte.app.RunOutput(type="directory", run_name=run.name),
    model_id="llama-2-70b",
    resources=flyte.Resources(cpu="8", memory="32Gi", gpu="L40s:4", disk="100Gi"),
    extra_args=["--tp", "4"],
    stream_model=True,
)
```

See [Prefetching models](https://www.union.ai/docs/v2/union/user-guide/serve-and-deploy-apps/prefetching-models) for more details on sharding.

## Autoscaling

SGLang apps work well with autoscaling:

```python
sglang_app = SGLangAppEnvironment(
    name="autoscaling-sglang-app",
    model_hf_path="Qwen/Qwen3-0.6B",
    model_id="qwen3-0.6b",
    resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1"),
    scaling=flyte.app.Scaling(
        replicas=(0, 1),  # Scale to zero when idle
        scaledown_after=600,  # 10 minutes idle before scaling down
    ),
    # ...
)
```

## Structured generation

SGLang is particularly well-suited for structured generation tasks. The deployed app supports standard OpenAI API calls, and you can use SGLang's advanced features through the API.

## Best practices

1. **Use prefetching**: Prefetch models for faster deployment and better reproducibility
2. **Enable streaming**: Use `stream_model=True` to reduce startup time and disk usage
3. **Right-size GPUs**: Match GPU memory to model size
4. **Use tensor parallelism**: For large models, use multiple GPUs with `--tp`
5. **Set autoscaling**: Use appropriate idle TTL to balance cost and performance
6. **Configure memory**: Use `--mem-fraction-static` to control memory allocation
7. **Limit context length**: Use `--max-model-len` for smaller models to reduce memory usage

## Troubleshooting

**Model loading fails:**
- Verify GPU memory is sufficient for the model
- Check that the model path or HuggingFace path is correct
- Review container logs for detailed error messages

**Out of memory errors:**
- Reduce `--max-model-len`
- Lower `--mem-fraction-static`
- Use a smaller model or more GPUs

**Slow startup:**
- Enable `stream_model=True` for faster loading
- Prefetch models before deployment
- Use faster storage backends

