# Basic project: RAG

This example demonstrates a two-stage RAG (Retrieval-Augmented Generation) pattern:
an offline embedding pipeline that processes and stores quotes, followed by an online
serving application that enables semantic search.

## Concepts covered

- `TaskEnvironment` for defining task execution environments
- `Dir` artifacts for passing directories between tasks
- `AppEnvironment` for serving applications
- `Parameter` and `RunOutput` for connecting apps to task outputs
- Semantic search with sentence-transformers and ChromaDB

## Part 1: The embedding pipeline

The embedding pipeline fetches quotes from a public API, creates vector embeddings
using sentence-transformers, and stores them in a ChromaDB database.

### Setting up the environment

The `TaskEnvironment` defines the execution environment for all tasks in the pipeline.
It specifies the container image, required packages, and resource allocations:

```python
# Define the embedding environment
embedding_env = flyte.TaskEnvironment(
    name="quote-embedding",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "sentence-transformers>=2.2.0",
        "chromadb>=0.4.0",
        "requests>=2.31.0",
    ),
    resources=flyte.Resources(cpu=2, memory="4Gi"),
    cache="auto",
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/embed.py*

The environment uses:
- `Image.from_debian_base()` to create a container with Python 3.12
- `with_pip_packages()` to install sentence-transformers and ChromaDB
- `Resources` to request 2 CPUs and 4GB of memory
- `cache="auto"` to enable automatic caching of task outputs

### Fetching data

The `fetch_quotes` task retrieves quotes from a public API:

```python
@embedding_env.task
async def fetch_quotes() -> list[dict]:
    """
    Fetch quotes from a public quotes API.

    Returns:
        List of quote dictionaries with 'quote' and 'author' fields.
    """
    import requests

    print("Fetching quotes from API...")
    response = requests.get("https://dummyjson.com/quotes?limit=100")
    response.raise_for_status()

    data = response.json()
    quotes = data.get("quotes", [])

    print(f"Fetched {len(quotes)} quotes")
    return quotes
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/embed.py*

This task demonstrates:
- Async task definition with `async def`
- Returning structured data (`list[dict]`) from a task
- Using the `@embedding_env.task` decorator to associate the task with its environment

### Creating embeddings

The `embed_quotes` task creates vector embeddings and stores them in ChromaDB:

```python
@embedding_env.task
async def embed_quotes(quotes: list[dict]) -> Dir:
    """
    Create embeddings for quotes and store them in ChromaDB.

    Args:
        quotes: List of quote dictionaries with 'quote' and 'author' fields.

    Returns:
        Directory containing the ChromaDB database.
    """
    import chromadb
    from sentence_transformers import SentenceTransformer

    print("Loading embedding model...")
    model = SentenceTransformer("all-MiniLM-L6-v2")

    # Create ChromaDB in a temporary directory
    db_dir = tempfile.mkdtemp()
    print(f"Creating ChromaDB at {db_dir}...")

    client = chromadb.PersistentClient(path=db_dir)
    collection = client.create_collection(
        name="quotes",
        metadata={"hnsw:space": "cosine"},
    )

    # Prepare data for insertion
    texts = [q["quote"] for q in quotes]
    ids = [str(q["id"]) for q in quotes]
    metadatas = [{"author": q["author"], "quote": q["quote"]} for q in quotes]

    print(f"Embedding {len(texts)} quotes...")
    embeddings = model.encode(texts, show_progress_bar=True)

    # Add to collection
    collection.add(
        ids=ids,
        embeddings=embeddings.tolist(),
        metadatas=metadatas,
        documents=texts,
    )

    print(f"Stored {len(quotes)} quotes in ChromaDB")
    return await Dir.from_local(db_dir)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/embed.py*

Key points:
- Uses the `all-MiniLM-L6-v2` model from sentence-transformers (runs on CPU)
- Creates a persistent ChromaDB database with cosine similarity
- Returns a `Dir` artifact that captures the entire database directory
- The `await Dir.from_local()` call uploads the directory to artifact storage

### Orchestrating the pipeline

The main pipeline task composes the individual tasks:

```python
@embedding_env.task
async def embedding_pipeline() -> Dir:
    """
    Main pipeline that fetches quotes and creates embeddings.

    Returns:
        Directory containing the ChromaDB database with quote embeddings.
    """
    print("Starting embedding pipeline...")

    # Fetch quotes from API
    quotes = await fetch_quotes()

    # Create embeddings and store in ChromaDB
    db_dir = await embed_quotes(quotes)

    print("Embedding pipeline complete!")
    return db_dir
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/embed.py*

### Running the pipeline

To run the embedding pipeline:

```python
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(embedding_pipeline)
    print(f"Embedding run URL: {run.url}")
    run.wait()
    print(f"Embedding complete! Database directory: {run.outputs()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/embed.py*

```bash
uv run embed.py
```

The pipeline will:
1. Fetch 100 quotes from the API
2. Create embeddings using sentence-transformers
3. Store everything in a ChromaDB database
4. Return the database as a `Dir` artifact

## Part 2: The serving application

The serving application provides a Streamlit web interface for searching quotes
using the embeddings created by the pipeline.

### App environment configuration

The `AppEnvironment` defines how the application runs:

```python
# Define the app environment
env = AppEnvironment(
    name="quote-search-app",
    description="Semantic search over quotes using embeddings",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit>=1.41.0",
        "sentence-transformers>=2.2.0",
        "chromadb>=0.4.0",
    ),
    args=["streamlit", "run", "app.py", "--server.port", "8080"],
    port=8080,
    resources=flyte.Resources(cpu=2, memory="4Gi"),
    parameters=[
        Parameter(
            name="quotes_db",
            value=RunOutput(task_name="quote-embedding.embedding_pipeline", type="directory"),
            download=True,
            env_var="QUOTES_DB_PATH",
        ),
    ],
    include=["app.py"],
    requires_auth=False,
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/serve.py*

Key configuration:
- `args` specifies the command to run the Streamlit app
- `port=8080` exposes the application on port 8080
- `parameters` defines inputs to the app:
  - `RunOutput` connects to the embedding pipeline's output
  - `download=True` downloads the directory to local storage
  - `env_var="QUOTES_DB_PATH"` makes the path available to the app
- `include=["app.py"]` bundles the Streamlit app with the deployment

### The Streamlit application

The app loads the ChromaDB database using the path from the environment variable:

```python
# Load the database
@st.cache_resource
def load_db():
    db_path = os.environ.get("QUOTES_DB_PATH")
    if not db_path:
        st.error("QUOTES_DB_PATH environment variable not set")
        st.stop()

    client = chromadb.PersistentClient(path=db_path)
    collection = client.get_collection("quotes")
    model = SentenceTransformer("all-MiniLM-L6-v2")
    return collection, model

collection, model = load_db()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/app.py*

The search interface provides a text input and result count slider:

```python
# Search interface
query = st.text_input("Enter your search query:", placeholder="e.g., love, wisdom, success")
top_k = st.slider("Number of results:", min_value=1, max_value=20, value=5)

col1, col2 = st.columns([1, 1])
with col1:
    search_button = st.button("Search", type="primary", use_container_width=True)
with col2:
    random_button = st.button("Random Quote", use_container_width=True)

st.divider()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/app.py*

When the user searches, the app encodes the query and finds similar quotes:

```python
if search_button and query:
    # Encode query and search
    query_embedding = model.encode([query])[0].tolist()
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
    )

    if results["documents"] and results["documents"][0]:
        for i, (doc, metadata, distance) in enumerate(
            zip(results["documents"][0], results["metadatas"][0], results["distances"][0])
        ):
            similarity = 1 - distance  # Convert distance to similarity
            st.markdown(f'**{i+1}.** "{doc}"')
            st.caption(f"— {metadata['author']} | Similarity: {similarity:.2%}")
            st.write("")
    else:
        st.info("No results found.")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/app.py*

The app also includes a random quote feature:

```python
elif random_button:
    # Get a random quote from the collection
    all_data = collection.get(limit=100)
    if all_data["documents"]:
        idx = random.randint(0, len(all_data["documents"]) - 1)
        quote = all_data["documents"][idx]
        author = all_data["metadatas"][idx]["author"]
        st.markdown(f'**"{quote}"**')
        st.caption(f"— {author}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/app.py*

### Deploying the app

To deploy the quote search application:

```python
if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy the quote search app
    print("Deploying quote search app...")
    deployment = flyte.serve(env)
    print(f"App deployed at: {deployment.url}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/basic-project/serve.py*

```bash
uv run serve.py
```

The app will be deployed and automatically connected to the embedding pipeline's
output through the `RunOutput` parameter.

## Key takeaways

1. **Two-stage RAG pattern**: Separate offline embedding creation from online serving
   for better resource utilization and cost efficiency.

2. **Dir artifacts**: Use `Dir` to pass entire directories (like databases) between
   tasks and to serving applications.

3. **RunOutput**: Connect applications to task outputs declaratively, enabling
   automatic data flow between pipelines and apps.

4. **CPU-friendly embeddings**: The `all-MiniLM-L6-v2` model runs efficiently on CPU,
   making this pattern accessible without GPU resources.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/basic-project.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/basic-project/
