# Run and deploy tasks
> This bundle contains all pages in the Run and deploy tasks section.
> Source: https://www.union.ai/docs/v2/union/user-guide/task-deployment/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment ===

# Run and deploy tasks

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

You have seen how to configure and build the tasks that compose your project.
Now you need to decide how to execute them on your Flyte backend.

Flyte offers two distinct approaches for getting your tasks onto the backend:

**Use `flyte run` when you're iterating and experimenting:**
- Quickly test changes during development
- Try different parameters or code modifications
- Debug issues without creating permanent artifacts
- Prototype new ideas rapidly

**Use `flyte deploy` when your project is ready to be formalized:**
- Freeze a stable version of your tasks for repeated use
- Share tasks with team members or across environments
- Move from experimentation to a more structured workflow
- Create a permanent reference point (not necessarily production-ready)

This section explains both approaches and when to use each one.

## Ephemeral deployment and immediate execution

The `flyte run` CLI command and the `flyte.run()` SDK function are used to **ephemerally deploy** and **immediately execute** a task on the backend in a single step.
The task can be re-run and its execution and outputs can be observed in the **Runs list** UI, but it is not permanently added to the **Tasks list** on the backend.

Let's say you have the following file called `greeting.py`:

```python
# greeting.py

import flyte

env = flyte.TaskEnvironment(name="greeting_env")

@env.task
async def greet(message: str) -> str:
    return f"{message}!"
```

### Programmatic

You can run the task programmatically using the `flyte.run()` function:

```python
# greeting.py

import flyte

env = flyte.TaskEnvironment(name="greeting_env")

@env.task
async def greet(message: str) -> str:
    return f"{message}!"

if __name__ == "__main__":
    flyte.init_from_config()
    result = flyte.run(greet, message="Good morning!")
    print(f"Result: {result}")
```

Here we add a `__main__` block to the `greeting.py` file that initializes the Flyte SDK from the configuration file and then calls `flyte.run()` with the `greet` task and its argument.
Now you can run the `greet` task on the backend just by executing the `greeting.py` file locally as a script:

```bash
python greeting.py
```

### CLI

The general form of the command for running a task from a local file is:

```bash
flyte run <file_path> <task_name> <args>
```

So, to run the `greet` task defined in the `greeting.py` file, you would run:

```bash
flyte run greeting.py greet --message "Good morning!"
```

This command:
1. **Temporarily deploys** the task environment named `greeting_env` (held by the variable `env`) that contains the `greet` task.
2. **Executes** the `greet` function with argument `message` set to `"Good morning!"`. Note that `message` is the actual parameter name defined in the function signature.
3. **Returns** the execution results and displays them in the terminal.

For more details on how `flyte run` and `flyte.run()` work under the hood, see **Run and deploy tasks > How task run works**.

## Persistent deployment

The `flyte deploy` CLI command and the `flyte.deploy()` SDK function are used to **persistently deploy** a task environment (and all its contained tasks) to the backend.
The tasks within the deployed environment will appear in the **Tasks list** UI on the backend and can then be executed multiple times without needing to redeploy them.

### Programmatic

You can deploy programmatically using the `flyte.deploy()` function:

```python
# greeting.py

import flyte

env = flyte.TaskEnvironment(name="greeting_env")

@env.task
async def greet(message: str) -> str:
    return f"{message}!"

if __name__ == "__main__":
    flyte.init_from_config()
    deployments = flyte.deploy(env)
    print(deployments[0].summary_repr())
```

Now you can deploy the `greeting_env` task environment (and therefore the `greet()` task) just by executing the `greeting.py` file locally as a script.

```bash
python greeting.py
```

### CLI

The general form of the command for deploying a task environment from a local file is:

```bash
flyte deploy <file_path> <task_environment_variable>
```

So, using the same `greeting.py` file as before, you can deploy the `greeting_env` task environment like this:

```bash
flyte deploy greeting.py env
```

This command deploys the task environment *assigned to the variable `env`* in the `greeting.py` file, which is the `TaskEnvironment` named `greeting_env`.

Notice that you must specify the *variable* to which the `TaskEnvironment` is assigned (`env` in this case), not the name of the environment itself (`greeting_env`).

Deploying a task environment deploys all tasks defined within it. Here, that means all functions decorated with `@env.task`.
In this case there is just one: `greet()`.

For more details on how `flyte deploy` and `flyte.deploy()` work under the hood, see **Run and deploy tasks > How task deployment works**.

## Running already deployed tasks

If you have already deployed your task environment, you can run its tasks without redeploying by using the `flyte run` CLI command or the `flyte.run()` SDK function in a slightly different way. Alternatively, you can always initiate execution of a deployed task from the UI.

### Programmatic

You can run already-deployed tasks programmatically using the `flyte.run()` function.
For example, to run the previously deployed `greet` task from the `greeting_env` environment:

```python
# greeting.py

import flyte

env = flyte.TaskEnvironment(name="greeting_env")

@env.task
async def greet(message: str) -> str:
    return f"{message}!"

if __name__ == "__main__":
    flyte.init_from_config()
    flyte.deploy(env)
    task = flyte.remote.Task.get("greeting_env.greet", auto_version="latest")
    result = flyte.run(task, message="Good morning!")
    print(f"Result: {result}")
```

When you execute this script locally, it will:

- Deploy the `greeting_env` task environment as before.
- Retrieve the already-deployed `greet` task using `flyte.remote.Task.get()`, specifying its full task reference as a string: `"greeting_env.greet"`.
- Call `flyte.run()` with the retrieved task and its argument.

For more details on how running already-deployed tasks works, see **Run and deploy tasks > How task run works**.

### CLI

To run a permanently deployed task using the `flyte run` CLI command, use the special `deployed-task` keyword followed by the task reference in the format `{environment_name}.{task_name}`. For example, to run the previously deployed `greet` task from the `greeting_env` environment:

```bash
flyte run deployed-task greeting_env.greet --message "World"
```

Notice that now that the task environment is deployed, you use its name (`greeting_env`), not by the variable name to which it was assigned in source code (`env`).
The task environment name plus the task name (`greet`) are combined with a dot (`.`) to form the full task reference: `greeting_env.greet`.
The special `deployed-task` keyword tells the CLI that you are referring to a task that has already been deployed. In effect, it replaces the file path argument used for ephemeral runs.

When executed, this command will run the already-deployed `greet` task with argument `message` set to `"World"`. You will see the result printed in the terminal. You can also, of course, observe the execution in the **Runs list** UI.

To execute a deployed task in a different project or domain than your configured defaults, use `--run-project` and `--run-domain`:

```bash
flyte run --run-project prod-project --run-domain production deployed-task greeting_env.greet --message "World"
```

For all `flyte run` options, see **Run and deploy tasks > Run command options**.

## Configuring runs with `flyte.with_runcontext()`

Both `flyte run` and `flyte.run()` accept a range of invocation-time parameters that control where the run executes, where outputs are stored, caching behavior, and more.
Programmatically, these are set with `flyte.with_runcontext()` before calling `.run()`.
Inside a running task, `flyte.ctx()` provides read access to the same context.

For the full parameter reference, see **Run and deploy tasks > Run context**.

<!--
TODO: Add link to Flyte remote documentation when available
For details on Flyte remote functionality, see the [Flyte remote]().
-->

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/how-task-run-works ===

# How task run works

The `flyte run` command and `flyte.run()` SDK function support three primary execution modes:

1. **Ephemeral deployment + run**: Automatically prepare task environments ephemerally and execute tasks (development shortcut)
2. **Run deployed task**: Execute permanently deployed tasks without redeployment
3. **Local execution**: Run tasks on your local machine for development and testing

Additionally, you can run deployed tasks through the Flyte/Union UI for interactive execution and monitoring.

## Ephemeral deployment + run: The development shortcut

The most common development pattern combines ephemeral task preparation and execution in a single command, automatically handling the temporary deployment process when needed.

### Programmatic

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

@env.task
async def my_task(name: str) -> str:
    return f"Hello, {name}!"

if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy and run in one step
    result = flyte.run(my_task, name="World")
    print(f"Result: {result}")
    print(f"Execution URL: {result.url}")
```

### CLI

```bash
flyte run my_example.py my_task --name "World"
```

With explicit project and domain:

```bash
flyte run --project my-project --domain development my_example.py my_task --name "World"
```

With deployment options:

```bash
flyte run --version v1.0.0 --copy-style all my_example.py my_task --name "World"
```

**How it works:**
1. **Environment discovery**: Flyte loads the specified Python file and identifies task environments
2. **Ephemeral preparation**: Temporarily prepares the task environment for execution (similar to deployment but not persistent)
3. **Task execution**: Immediately runs the specified task with provided arguments in the ephemeral environment
4. **Result return**: Returns execution results and monitoring URL
5. **Cleanup**: The ephemeral environment is not stored permanently in the backend

**Benefits of ephemeral deployment + run:**
- **Development efficiency**: No separate permanent deployment step required
- **Always current**: Uses your latest code changes without polluting the backend
- **Clean development**: Ephemeral environments don't clutter your task registry
- **Integrated workflow**: Single command for complete development cycle

## Running deployed tasks

For production workflows or when you want to use stable deployed versions, you can run tasks that have been **permanently deployed** with `flyte deploy` without triggering any deployment process.

### Programmatic

```python
import flyte

flyte.init_from_config()

# Method 1: Using remote task reference
deployed_task = flyte.remote.Task.get("my_env.my_task", version="v1.0.0")
result = flyte.run(deployed_task, name="World")

# Method 2: Get latest version
deployed_task = flyte.remote.Task.get("my_env.my_task", auto_version="latest")
result = flyte.run(deployed_task, name="World")
```

### CLI

```bash
flyte run deployed-task my_env.my_task --name "World"
```

With a specific project and domain:

```bash
flyte run --project prod --domain production deployed-task my_env.my_task --batch_size 1000
```

**Task reference format:** `{environment_name}.{task_name}`
- `environment_name`: The `name` property of your `TaskEnvironment`
- `task_name`: The function name of your task

>[!NOTE]
> When you deploy a task environment with `flyte deploy`, you specify the `TaskEnvironment` by the variable to which it is assigned.
> Once deployed, you refer to it by its `name` property.

**Benefits of running deployed tasks:**
- **Performance**: No deployment overhead, faster execution startup
- **Stability**: Uses tested, stable versions of your code
- **Production safety**: Isolated from local development changes
- **Version control**: Explicit control over which code version runs

## Local execution

For development, debugging, and testing, you can run tasks locally on your machine without any backend interaction.

### Programmatic

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

@env.task
async def my_task(name: str) -> str:
    return f"Hello, {name}!"

# Method 1: No client configured (defaults to local)
result = flyte.run(my_task, name="World")

# Method 2: Explicit local mode
flyte.init_from_config()  # Client configured
result = flyte.with_runcontext(mode="local").run(my_task, name="World")
```

### CLI

```bash
flyte run --local my_example.py my_task --name "World"
```

With development data:

```bash
flyte run --local data_pipeline.py process_data --input_path "/local/data" --debug true
```

**Benefits of local execution:**
- **Rapid development**: Instant feedback without network latency
- **Debugging**: Full access to local debugging tools
- **Offline development**: Works without backend connectivity
- **Resource efficiency**: Uses local compute resources

## Running tasks through the Union UI

If you are running your Flyte code on a Union backend, the UI provides an interactive way to run deployed tasks with form-based input and real-time monitoring.

### Accessing task execution in the Union UI

1. **Navigate to tasks**: Go to your project → domain → Tasks section
2. **Select task**: Choose the task environment and specific task
3. **Launch execution**: Click "Launch" to open the execution form
4. **Provide inputs**: Fill in task parameters through the web interface
5. **Monitor progress**: Watch real-time execution progress and logs

**UI execution benefits:**
- **User-friendly**: No command-line expertise required
- **Visual monitoring**: Real-time progress visualization
- **Input validation**: Built-in parameter validation and type checking
- **Execution history**: Easy access to previous runs and results
- **Sharing**: Shareable execution URLs for collaboration

Here is a short video demonstrating task execution through the Union UI:

📺 [Watch on YouTube](https://www.youtube.com/watch?v=id="8jbau9yGoDg)

## Execution flow and architecture

### Fast registration architecture

Flyte v2 uses "fast registration" to enable rapid development cycles:

#### How it works

1. **Container images** contain the runtime environment and dependencies
2. **Code bundles** contain your Python source code (stored separately)
3. **At runtime**: Code bundles are downloaded and injected into running containers

#### Benefits

- **Rapid iteration**: Update code without rebuilding images
- **Resource efficiency**: Share images across multiple deployments
- **Version flexibility**: Run different code versions with same base image
- **Caching optimization**: Separate caching for images vs. code

#### When code gets injected

At task execution time, the fast registration process follows these steps:

1. **Container starts** with the base image containing runtime environment and dependencies
2. **Code bundle download**: The Flyte agent downloads your Python code bundle from storage
3. **Code extraction**: The code bundle is extracted and mounted into the running container
4. **Task execution**: Your task function executes with the injected code

### Ephemeral preparation logic

When using ephemeral deploy + run mode, Flyte determines whether temporary preparation is needed:

```mermaid
graph TD
    A[flyte run command] --> B{Need preparation?}
    B -->|Yes| C[Ephemeral preparation]
    B -->|No| D[Use cached preparation]
    C --> E[Execute task]
    D --> E
    E --> F[Cleanup ephemeral environment]
```

### Execution modes comparison

| Mode | Deployment | Performance | Use Case | Code Version |
|------|------------|-------------|-----------|--------------|
| Ephemeral Deploy + Run | Ephemeral (temporary) | Medium | Development, testing | Latest local |
| Run Deployed | None (uses permanent deployment) | Fast | Production, stable runs | Deployed version |
| Local | None | Variable | Development, debugging | Local |
| UI | None | Fast | Interactive, collaboration | Deployed version |

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/interacting-with-runs ===

# Interact with runs and actions

When a task is launched, the resulting execution is called a **run**.
Because tasks typically call other tasks, a run will almost always involve multiple sub-task executions. Each such execution is called an **action**.

Through the Flyte SDK and CLI, you can interact with the run and its actions to monitor progress, retrieve results, and access data. This section explains how to work with runs and actions programmatically and through the CLI.

## Understanding runs and actions

Runs are not declared explicitly in the code of the entry point task.
Instead, they are simply a result of the task being invoked in a specific way:
* User with `flyte run`
* User via the UI
* Other code calling `flyte.run()`
* [Trigger](https://www.union.ai/docs/v2/union/user-guide/task-configuration/triggers)

When a task is invoked in one of these ways, it creates a run to represent the execution of that task and all its nested tasks, considered together.
Each task execution within that run is represented by an **action**.
The entry point task execution is represented by the main action (usually called `a0`), and then every nested call of one task from another creates an additional action.

```mermaid
graph TD
    A[Run] --> B[Action a0 - Main task]
    B --> C[Action a1 - Nested task]
    B --> D[Action a2 - Nested task]
    D --> E[Action a3 - Deeply nested task]
```

Because what constitutes a run depends only on how a task is invoked, the same task can execute as a deeply nested action in one run and the main action in another run.
Unlike Flyte 1, there is no explicit `@workflow` construct in Flyte 2; instead, "workflows" are defined implicitly by the structure of task composition and the entry point chosen at runtime.

> [!NOTE]
> Despite there being no explicit `@workflow` decorator, you'll often see the assemblage of tasks referred to as a "workflow" in documentation and discussions. The top-most task in a run is sometimes referred to as the "parent", "driver", or "entry point" task of the "workflow".
> In these docs we will sometime use "workflow" informally to refer to the collection of tasks (considered statically) involved in a run.

### Key concepts

- **Attempts**: Each action can have multiple attempts due to retries. Retries occur for two reasons:
  - User-configured retries for handling transient failures
  - Automatic system retries for infrastructure issues

- **Phases**: Both runs and actions progress through phases (e.g., QUEUED, RUNNING, SUCCEEDED, FAILED) until reaching a terminal state

- **Durability**: Flyte is a durable execution engine, so every input, output, failure, and attempt is recorded for each action. All data is persisted, allowing you to retrieve information about runs and actions even after completion

## Working with runs

Runs are created when you execute tasks using `flyte run` or `flyte.run()`. For details on running tasks, see [how task run works](./how-task-run-works). To learn about running previously deployed remote tasks, see [remote tasks](https://www.union.ai/docs/v2/union/user-guide/task-programming/remote-tasks).

### Retrieving a run

### Programmatic

Use `flyte.remote.Run.get()` to retrieve information about a run:

```python
import flyte

flyte.init_from_config()

# Get a run by name
run = flyte.remote.Run.get("my_run_name")

# Access basic information
print(run.url)        # UI URL for the run
print(run.action.phase)  # Phase of the main action
```

### CLI

Get a specific run:

```bash
flyte get run my_run_name
```

List all runs:

```bash
flyte get run
```

Use `--project` and `--domain` to scope results to a specific [project-domain pair](https://www.union.ai/docs/v2/union/user-guide/projects-and-domains).
For all available options, see the [CLI reference](https://www.union.ai/docs/v2/union/api-reference/flyte-cli).

### Watching run progress

Monitor a run as it progresses through phases:

```python
# Wait for run to complete
run = flyte.run(my_task, input_data="test")
run.wait()  # Blocks until terminal state

# Check if done
if run.action.done():
    print("Run completed!")
```

### Getting detailed run information

Use `flyte.remote.RunDetails` for comprehensive information including nested actions and metadata:

```python
run_details = flyte.remote.RunDetails.get(name="my_run_name")

# Access detailed information
print(run_details.pb2)  # Full protobuf representation
```

## Working with actions

Actions represent individual task executions within a run. Each action has a unique identifier within its parent run.

### Retrieving an action

### Programmatic

```python
# Get a specific action by run name and action name
action = flyte.remote.Action.get(
    run_name="my_run_name",
    name="a0"  # Main action
)

# Access action information
print(action.phase)       # Current phase
print(action.task_name)   # Task being executed
print(action.start_time)  # Execution start time
```

### CLI

Get a specific action:

```bash
flyte get action my_run_name a0
```

List all actions for a run:

```bash
flyte get action my_run_name
```

For all available options, see the [CLI reference](https://www.union.ai/docs/v2/union/api-reference/flyte-cli).

### Nested actions

Deeply nested actions are uniquely identified by their path under the run:

```python
# Get a nested action
nested_action = flyte.remote.Action.get(
    run_name="my_run_name",
    name="a1"  # Nested action identifier
)
```

### Getting detailed action information

Use `flyte.remote.ActionDetails` for comprehensive action information:

```python
action_details = flyte.remote.ActionDetails.get(
    run_name="my_run_name",
    name="a0"
)

# Access detailed information
print(action_details.pb2)  # Full protobuf representation
```

## Retrieving inputs and outputs

### Programmatic

Both `Run` and `Action` objects provide methods to retrieve inputs and outputs:

```python
run = flyte.remote.Run.get("my_run_name")

# Get inputs - returns ActionInputs (dict-like)
inputs = run.inputs()
print(inputs)  # {"param_name": value, ...}

# Get outputs - returns tuple
outputs = run.outputs()
print(outputs)  # (result1, result2, ...)

# Single output
single_output = outputs[0]

# No outputs are represented as (None,)
```

**Important notes:**

- **Inputs**: Returned as `flyte.remote.ActionInputs`, a dictionary with parameter names as keys and values as the actual data passed in
- **Outputs**: Always returned as `flyte.remote.ActionOutputs` tuple, even for single outputs or no outputs
- **No outputs**: Represented as `(None,)`
- **Availability**: Outputs are only available if the action completed successfully
- **Type safety**: Flyte's rich type system converts data to an intermediate representation, allowing retrieval even without the original dependencies installed

### CLI

Get inputs and outputs for a run:

```bash
flyte get io my_run_name
```

Get inputs and outputs for a specific action:

```bash
flyte get io my_run_name a1
```

For all available options, see the [CLI reference](https://www.union.ai/docs/v2/union/api-reference/flyte-cli).

### Handling failures

If an action fails, outputs are not available, but you can retrieve error information:

```python
action = flyte.remote.Action.get(run_name="my_run_name", name="a0")

if action.phase == flyte.models.ActionPhase.FAILED:
    # Outputs will raise an error
    try:
        outputs = action.outputs()
    except RuntimeError as e:
        print("Action failed, outputs not available")

    # Get error details instead
    action_details = flyte.remote.ActionDetails.get(
        run_name="my_run_name",
        name="a0"
    )
    print(action_details.pb2.error_info)
```

## Understanding data storage

Flyte handles different types of data differently, as explained in [data flow](https://www.union.ai/docs/v2/union/user-guide/run-scaling/data-flow):

- **Parameterized data** (primitives, small objects): Returned directly in inputs/outputs
- **Large data** (files, directories, DataFrames, models): Stored in cloud storage (S3, GCS, Azure Blob Storage)

When you retrieve outputs containing large data, Flyte returns references rather than the actual data. To access the actual raw data, you need proper cloud storage permissions and configuration.

## Accessing large data from cloud storage

To download and work with files, directories, and DataFrames stored in cloud object storage, you must configure storage access with appropriate credentials.

### S3 storage access

To access data stored in Amazon S3:

**1. Set environment variables:**

```bash
export FLYTE_AWS_ACCESS_KEY_ID="your-access-key-id"
export FLYTE_AWS_SECRET_ACCESS_KEY="your-secret-access-key"
```

These are standard AWS credential environment variables that Flyte recognizes.

**2. Initialize Flyte with S3 storage configuration:**

```python
import flyte
import flyte.storage

# Auto-configure from environment variables
flyte.init_from_config(
    storage=flyte.storage.S3.auto(region="us-east-2")
)

# Or provide credentials explicitly
flyte.init_from_config(
    storage=flyte.storage.S3(
        access_key_id="your-access-key-id",
        secret_access_key="your-secret-access-key",
        region="us-east-2"
    )
)
```

**3. Access data from outputs:**

```python
run = flyte.remote.Run.get("my_run_name")
outputs = run.outputs()

# Outputs containing files, dataframes, etc. can now be downloaded
dataframe = outputs[0]
df = await dataframe.open(pd.DataFrame).all()
```

### GCS storage access

To access data stored in Google Cloud Storage:

**1. Set environment variables:**

```bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
```

This is the standard Google Cloud authentication method using service account credentials.

**2. Initialize Flyte with GCS storage configuration:**

```python
import flyte
import flyte.storage

# Auto-configure from environment
flyte.init_from_config(
    storage=flyte.storage.GCS.auto()
)

# Or configure explicitly
flyte.init_from_config(
    storage=flyte.storage.GCS()
)
```

**3. Access data from outputs:**

```python
run = flyte.remote.Run.get("my_run_name")
outputs = run.outputs()

# Download data as needed
file_output = outputs[0]
# Work with file output
```

### Azure Blob Storage access

To access data stored in Azure Blob Storage (ABFS):

**1. Set environment variables:**

For storage account key authentication:
```bash
export AZURE_STORAGE_ACCOUNT_NAME="your-storage-account"
export AZURE_STORAGE_ACCOUNT_KEY="your-account-key"
```

For service principal authentication:
```bash
export AZURE_TENANT_ID="your-tenant-id"
export AZURE_CLIENT_ID="your-client-id"
export AZURE_CLIENT_SECRET="your-client-secret"
export AZURE_STORAGE_ACCOUNT_NAME="your-storage-account"
```

**2. Initialize Flyte with Azure storage configuration:**

```python
import flyte
import flyte.storage

# Auto-configure from environment variables
flyte.init_from_config(
    storage=flyte.storage.ABFS.auto()
)

# Or provide credentials explicitly
flyte.init_from_config(
    storage=flyte.storage.ABFS(
        account_name="your-storage-account",
        account_key="your-account-key"
    )
)

# Or use service principal
flyte.init_from_config(
    storage=flyte.storage.ABFS(
        account_name="your-storage-account",
        tenant_id="your-tenant-id",
        client_id="your-client-id",
        client_secret="your-client-secret"
    )
)
```

**3. Access data from outputs:**

```python
run = flyte.remote.Run.get("my_run_name")
outputs = run.outputs()

# Download data as needed
directory_output = outputs[0]
# Work with directory output
```

## Complete example

Here's a complete example showing how to launch a run and interact with it:

```python
import flyte
import flyte.storage

# Initialize with storage access
flyte.init_from_config(
    storage=flyte.storage.S3.auto(region="us-east-2")
)

# Define and run a task
env = flyte.TaskEnvironment(name="data_processing")

@env.task
async def process_data(input_value: str) -> str:
    return f"Processed: {input_value}"

# Launch the run
run = flyte.run(process_data, input_value="test_data")

# Monitor progress
print(f"Run URL: {run.url}")
run.wait()

# Check status
if run.action.done():
    print(f"Run completed with phase: {run.action.phase}")

    # Get inputs and outputs
    inputs = run.inputs()
    print(f"Inputs: {inputs}")

    outputs = run.outputs()
    print(f"Outputs: {outputs}")

    # Access the result
    result = outputs[0]
    print(f"Result: {result}")
```

## API reference

### Key classes

- `flyte.remote.Run` - Represents a run with basic information
- `flyte.remote.RunDetails` - Detailed run information including all actions
- `flyte.remote.Action` - Represents an action with basic information
- `flyte.remote.ActionDetails` - Detailed action information including error details
- `flyte.remote.ActionInputs` - Dictionary-like object containing action inputs
- `flyte.remote.ActionOutputs` - Tuple containing action outputs

### CLI commands

For complete CLI documentation and all available options, see the [Flyte CLI reference](https://www.union.ai/docs/v2/union/api-reference/flyte-cli):
- [`flyte get run`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) - Get run information
- [`flyte get action`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) - Get action information
- [`flyte get io`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) - Get inputs and outputs
- [`flyte get logs`](https://www.union.ai/docs/v2/union/api-reference/flyte-cli) - Get action logs

### Storage configuration

- `flyte.storage.S3` - Amazon S3 configuration
- `flyte.storage.GCS` - Google Cloud Storage configuration
- `flyte.storage.ABFS` - Azure Blob Storage configuration

For more details on data flow and storage, see [data flow](https://www.union.ai/docs/v2/union/user-guide/run-scaling/data-flow).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/work-with-local-data ===

# Work with local data

When running Flyte tasks that take inputs like DataFrames, files, or directories, data is passed between actions through the configured blob store. For details on how data flows through your workflows, see [data flow](https://www.union.ai/docs/v2/union/user-guide/run-scaling/data-flow).

Flyte provides several built-in types for handling data:
- `flyte.io.DataFrame` for tabular data
- `flyte.io.File` for individual files
- `flyte.io.Dir` for directories

You can also create custom type extensions for specialized data types. See [custom types](https://www.union.ai/docs/v2/union/user-guide/task-programming/handling-custom-types) for details.

## Local execution

One of the most powerful features of Flyte is the ability to work with data entirely locally, without creating a remote run. When you run tasks in local mode, all inputs, outputs, and intermediate data stay on your local machine.

```python
import flyte

env = flyte.TaskEnvironment(name="local_data")

@env.task
async def process_data(data: str) -> str:
    return f"Processed: {data}"

# Run locally - no remote storage needed
run = flyte.with_runcontext(mode="local").run(process_data, data="test")
run.wait()
print(run.outputs()[0])
```

For more details on local execution, see [how task run works](./how-task-run-works#local-execution).

## Uploading local data to remote runs

When you want to send local data to a remote task, you need to upload it first. Flyte provides a secure data uploading system that handles this automatically. The same system used for [code bundling](./packaging) can upload files, DataFrames, and directories.

To upload local data, use the Flyte core representation for that type with the `from_local_sync()` method.

### Uploading DataFrames

Use `flyte.io.DataFrame.from_local_sync()` to upload a local DataFrame:

```python
from typing import Annotated

import pandas as pd

import flyte
import flyte.io

img = flyte.Image.from_debian_base()
img = img.with_pip_packages("pandas", "pyarrow")

env = flyte.TaskEnvironment(
    "dataframe_usage",
    image=img,
    resources=flyte.Resources(cpu="1", memory="2Gi"),
)

@env.task
async def process_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Process a DataFrame and return the result."""
    df["processed"] = True
    return df

if __name__ == "__main__":
    flyte.init_from_config()

    # Create a local pandas DataFrame
    local_df = pd.DataFrame({
        "name": ["Alice", "Bob", "Charlie"],
        "value": [10, 20, 30]
    })

    # Upload the local DataFrame for remote execution
    uploaded_df = flyte.io.DataFrame.from_local_sync(local_df)

    # Pass to a remote task
    run = flyte.run(process_dataframe, df=uploaded_df)
    print(f"Run URL: {run.url}")
    run.wait()
    print(run.outputs()[0])
```

### Uploading files

Use `flyte.io.File.from_local_sync()` to upload a local file:

```python
import tempfile

import flyte
from flyte.io import File

env = flyte.TaskEnvironment(name="file-local")

@env.task
async def process_file(file: File) -> str:
    """Read and process a file."""
    async with file.open("rb") as f:
        content = bytes(await f.read())
        return content.decode("utf-8")

if __name__ == "__main__":
    flyte.init_from_config()

    # Create a temporary local file
    with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as temp:
        temp.write("Hello, Flyte!")
        temp_path = temp.name

    # Upload the local file for remote execution
    file = File.from_local_sync(temp_path)

    # Pass to a remote task
    run = flyte.run(process_file, file=file)
    print(f"Run URL: {run.url}")
    run.wait()
    print(run.outputs()[0])
```

### Uploading directories

Use `flyte.io.Dir.from_local_sync()` to upload a local directory:

```python
import os
import tempfile

import flyte
from flyte.io import Dir

env = flyte.TaskEnvironment(name="dir-local")

@env.task
async def process_dir(dir: Dir) -> dict[str, str]:
    """Process a directory and return file contents."""
    file_contents = {}
    async for file in dir.walk(recursive=False):
        if file.name.endswith(".py"):
            async with file.open("rb") as f:
                content = bytes(await f.read())
                file_contents[file.name] = content.decode("utf-8")[:100]
    return file_contents

if __name__ == "__main__":
    flyte.init_from_config()

    # Create a temporary directory with test files
    with tempfile.TemporaryDirectory() as temp_dir:
        for i in range(3):
            with open(os.path.join(temp_dir, f"file{i}.py"), "w") as f:
                f.write(f"print('Hello from file {i}!')")

        # Upload the local directory for remote execution
        dir = Dir.from_local_sync(temp_dir)

        # Pass to a remote task
        run = flyte.run(process_dir, dir=dir)
        print(f"Run URL: {run.url}")
        run.wait()
        print(run.outputs()[0])
```

## Passing outputs between runs

If you're passing outputs from a previous run to a new run, no upload is needed. Flyte's data is represented using native references that point to storage locations, so passing them between runs works automatically:

```python
import flyte

flyte.init_from_config()

# Get outputs from a previous run
previous_run = flyte.remote.Run.get("my_previous_run")
previous_output = previous_run.outputs()[0]  # Already a Flyte reference

# Pass directly to a new run - no upload needed
new_run = flyte.run(my_task, data=previous_output)
```

## Performance considerations

The `from_local_sync()` method uses HTTP to upload data. This is convenient but not the most performant option for large datasets.

**Best suited for:**
- Small to medium test datasets
- Development and debugging
- Quick prototyping

**For larger data uploads**, configure cloud storage access and use `flyte.storage` directly:

```python
import flyte
import flyte.storage

# Configure storage access
flyte.init_from_config(
    storage=flyte.storage.S3.auto(region="us-east-2")
)
```

For details on configuring storage access, see [interact with runs and actions](./interacting-with-runs#accessing-large-data-from-cloud-storage).

## Summary

| Scenario | Approach |
|----------|----------|
| Local development and testing | Use local execution mode |
| Small test data to remote tasks | Use `from_local_sync()` |
| Passing data between runs | Pass outputs directly (automatic) |
| Large datasets | Configure `flyte.storage` for direct cloud access |

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/run-command-options ===

# Run command options

The `flyte run` command provides the following options:

**`flyte run [OPTIONS] <PATH>|deployed_task <TASK_NAME>`**

| Option                      | Short | Type   | Default                   | Description                                            |
|-----------------------------|-------|--------|---------------------------|--------------------------------------------------------|
| `--project`                 | `-p`  | text   | *from config*             | Project to run tasks in                                |
| `--domain`                  | `-d`  | text   | *from config*             | Domain to run tasks in                                 |
| `--local`                   |       | flag   | `false`                   | Run the task locally                                   |
| `--copy-style`              |       | choice | `loaded_modules|all|none` | Code bundling strategy                                 |
| `--root-dir`                |       | path   | *current dir*             | Override source root directory                         |
| `--raw-data-path`           |       | text   |                           | Override the output location for offloaded data types. |
| `--service-account`         |       | text   |                           | Kubernetes service account.                            |
| `--name`                    |       | text   |                           | Name of the run.                                       |
| `--follow`                  | `-f`  | flag   | `false`                   | Wait and watch logs for the parent action.             |
| `--image`                   |       | text   |                           | Image to be used in the run (format: `name=uri`).      |
| `--no-sync-local-sys-paths` |       | flag   | `false`                   | Disable synchronization of local sys.path entries.      |
| `--run-project`             |       | text   | *from config*             | Execute deployed task in this project (`deployed-task` only). |
| `--run-domain`              |       | text   | *from config*             | Execute deployed task in this domain (`deployed-task` only).  |

## `--project`, `--domain`

**`flyte run --domain <DOMAIN> --project <PROJECT> <PATH>|deployed_task <TASK_NAME>`**

You can specify `--project` and `--domain` which will override any defaults defined in your `config.yaml`:

```bash
flyte run my_example.py my_task
```

Specify a target project and domain:

```bash
flyte run --project my-project --domain development my_example.py my_task
```

## `--run-project`, `--run-domain`

**`flyte run --run-project <PROJECT> --run-domain <DOMAIN> deployed-task <TASK_REF>`**

When using the `deployed-task` subcommand, `--run-project` and `--run-domain` specify the [project-domain pair](https://www.union.ai/docs/v2/union/user-guide/projects-and-domains) in which to *execute* the task. This lets you run a deployed task in a different project or domain than the one configured in your `config.yaml`:

```bash
flyte run --run-project prod-project --run-domain production deployed-task my_env.my_task
```

If not provided, these default to the `task.project` and `task.domain` values in your configuration file. These options only apply to the `deployed-task` subcommand and are ignored for file-based runs.

## `--local`

**`flyte run --local <PATH> <TASK_NAME>`**

The `--local` option runs tasks locally instead of submitting them to the remote Flyte backend:

```bash
flyte run --local my_example.py my_task --input "test_data"
```

Compare with remote execution:

```bash
flyte run my_example.py my_task --input "test_data"
```

### When to use local execution

- **Development and testing**: Quick iteration without deployment overhead
- **Debugging**: Full access to local debugging tools and environment
- **Resource constraints**: When remote resources are unavailable or expensive
- **Data locality**: When working with large local datasets

## `--copy-style`

**`flyte run --copy-style [loaded_modules|all|none] <PATH> <TASK_NAME>`**

The `--copy-style` option controls code bundling for remote execution.
This applies to the ephemeral preparation step of the `flyte run` command and works similarly to `flyte deploy`:

Smart bundling (default) — includes only imported project modules:

```bash
flyte run --copy-style loaded_modules my_example.py my_task
```

Include all project files:

```bash
flyte run --copy-style all my_example.py my_task
```

No code bundling (task must be pre-deployed):

```bash
flyte run --copy-style none deployed_task my_deployed_task
```

### Copy style options

- **`loaded_modules` (default)**: Bundles only imported Python modules from your project
- **`all`**: Includes all files in the project directory
- **`none`**: No bundling; requires permanently deployed tasks

## `--root-dir`

**`flyte run --root-dir <DIRECTORY> <PATH> <TASK_NAME>`**

Override the source directory for code bundling and import resolution:

Run from a monorepo root with a specific root directory:

```bash
flyte run --root-dir ./services/ml ./services/ml/my_example.py my_task
```

Handle cross-directory imports:

```bash
flyte run --root-dir .. my_example.py my_workflow
```
This applies to the ephemeral preparation step of the `flyte run` command.
It works identically to the `flyte deploy` command's `--root-dir` option.

## `--raw-data-path`

**`flyte run --raw-data-path <PATH> <SOURCE> <TASK_NAME>`**

Override the default output location for offloaded data types (large objects, DataFrames, etc.):

Use a custom S3 location for large outputs:

```bash
flyte run --raw-data-path s3://my-bucket/custom-path/ my_example.py process_large_data
```

Use a local directory for development:

```bash
flyte run --local --raw-data-path ./output/ my_example.py my_task
```

### Use cases

- **Custom storage locations**: Direct outputs to specific S3 buckets or paths
- **Cost optimization**: Use cheaper storage tiers for temporary data
- **Access control**: Ensure outputs go to locations with appropriate permissions
- **Local development**: Store large outputs locally when testing

## `--service-account`

**`flyte run --service-account <ACCOUNT_NAME> <PATH> <TASK_NAME>`**

Specify a Kubernetes service account for task execution:

```bash
flyte run --service-account ml-service-account my_example.py train_model
flyte run --service-account data-reader-sa my_example.py load_data
```

### Use cases

- **Cloud resource access**: Service accounts with permissions for S3, GCS, etc.
- **Security isolation**: Different service accounts for different workload types
- **Compliance requirements**: Enforcing specific identity and access policies

## `--name`

**`flyte run --name <EXECUTION_NAME> <PATH> <TASK_NAME>`**

Provide a custom name for the execution run:

```bash
flyte run --name "daily-training-run-2024-12-02" my_example.py train_model
flyte run --name "experiment-lr-0.01-batch-32" my_example.py hyperparameter_sweep
```

### Benefits of custom names

- **Easy identification**: Find specific runs in the Flyte console
- **Experiment tracking**: Include key parameters or dates in names
- **Automation**: Programmatically generate meaningful names for scheduled runs

## `--follow`

**`flyte run --follow <PATH> <TASK_NAME>`**

Wait and watch logs for the execution in real-time:

```bash
flyte run --follow my_example.py long_running_task
```

Combine with other options:

```bash
flyte run --follow --name "training-session" my_example.py train_model
```

### Behavior

- **Log streaming**: Real-time output from task execution
- **Blocking execution**: Command waits until task completes
- **Exit codes**: Returns appropriate exit code based on task success/failure

## `--image`

**`flyte run --image <IMAGE_MAPPING> <PATH> <TASK_NAME>`**

Override container images during ephemeral preparation, same as the equivalent `flyte deploy` option:

Override a specific named image:

```bash
flyte run --image gpu=ghcr.io/org/gpu:v2.1 my_example.py gpu_task
```

Override the default image:

```bash
flyte run --image ghcr.io/org/custom:latest my_example.py my_task
```

Multiple image overrides:

```bash
flyte run \
  --image base=ghcr.io/org/base:v1.0 \
  --image gpu=ghcr.io/org/gpu:v2.0 \
  my_example.py multi_env_workflow
```

### Image mapping formats

- **Named mapping**: `name=uri` overrides images created with `Image.from_ref_name("name")`
- **Default mapping**: `uri` overrides the default "auto" image
- **Multiple mappings**: Use multiple `--image` flags for different image references

## `--no-sync-local-sys-paths`

**`flyte run --no-sync-local-sys-paths <PATH> <TASK_NAME>`**

Disable synchronization of local `sys.path` entries to the remote execution environment during ephemeral preparation.
Identical to the `flyte deploy` command's `--no-sync-local-sys-paths` option:

```bash
flyte run --no-sync-local-sys-paths my_example.py my_task
```

This advanced option works identically to the deploy command equivalent, useful for:
- **Container isolation**: Prevent local development paths from affecting remote execution
- **Custom environments**: When containers have pre-configured Python paths
- **Security**: Avoiding exposure of local directory structures

## Task argument passing

Arguments are passed directly as function parameters:

CLI — arguments as flags:

```bash
flyte run my_file.py my_task --name "World" --count 5 --debug true
```

SDK — arguments as function parameters:

```python
result = flyte.run(my_task, name="World", count=5, debug=True)
```

## SDK options

The core `flyte run` functionality is also available programmatically through the `flyte.run()` function.
For SDK-level configuration of all run parameters (storage, caching, identity, logging, and more),
see [Run context](./run-context).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/how-task-deployment-works ===

# How task deployment works

In this section, we will take a deep dive into how the `flyte deploy` command and the `flyte.deploy()` SDK function work under the hood to deploy tasks to your Flyte backend.

When you perform a deployment, here's what happens:

## 1. Module loading and task environment discovery

In the first step, Flyte determines which files to load in order to search for task environments, based on the command line options provided:

### Single file (default)

```bash
flyte deploy my_example.py env
```

- The file `my_example.py` is executed,
- All declared `TaskEnvironment` objects in the file are instantiated,
  but only the one assigned to the variable `env` is selected for deployment.

### `--all` option

```bash
flyte deploy --all my_example.py
```
- The file `my_example.py` is executed,
- All declared `TaskEnvironment` objects in the file are instantiated and selected for deployment.
- No specific variable name is required.

### `--recursive` option

```bash
flyte deploy --recursive ./directory
```

- The directory is recursively traversed and all Python files are executed and all `TaskEnvironment` objects are instantiated.
- All `TaskEnvironment` objects across all files are selected for deployment.

## 2. Task analysis and serialization

- For every task environment selected for deployment, all of its tasks are identified.
- Task metadata is extracted: parameter types, return types, and resource requirements.
- Each task is serialized into a Flyte `TaskTemplate`.
- Dependency graphs between environments are built (see below).

## 3. Task environment dependency resolution

In many cases, a task in one environment may invoke a task in another environment, establishing a dependency between the two environments.
For example, if `env_a` has a task that calls a task in `env_b`, then `env_a` depends on `env_b`.
This means that when deploying `env_a`, `env_b` must also be deployed to ensure that all tasks can be executed correctly.

To handle this, `TaskEnvironment`s can declare dependencies on other `TaskEnvironment`s using the `depends_on` parameter.
During deployment, the system performs the following steps to resolve these dependencies:

1. Starting with specified environment(s)
2. Recursively discovering all transitive dependencies
3. Including all dependencies in the deployment plan
4. Processing dependencies depth-first to ensure correct order

```python
# Define environments with dependencies
prep_env = flyte.TaskEnvironment(name="preprocessing")
ml_env = flyte.TaskEnvironment(name="ml_training", depends_on=[prep_env])
viz_env = flyte.TaskEnvironment(name="visualization", depends_on=[ml_env])

# Deploy only viz_env - automatically includes ml_env and prep_env
deployment = flyte.deploy(viz_env, version="v2.0.0")

# Or deploy multiple environments explicitly
deployment = flyte.deploy(data_env, ml_env, viz_env, version="v2.0.0")
```

For detailed information about working with multiple environments, see [Multiple Environments](https://www.union.ai/docs/v2/union/user-guide/task-configuration/multiple-environments).

## 4. Code bundle creation and upload

Once the task environments and their dependencies are resolved, Flyte proceeds to package your code into a bundle based on the `copy_style` option:

### `--copy_style loaded_modules` (default)

This is the smart bundling approach that analyzes which Python modules were actually imported during the task environment discovery phase.
It examines the runtime module registry (`sys.modules`) and includes only those modules that meet specific criteria:
they must have source files located within your project directory (not in system locations like `site-packages`), and they must not be part of the Flyte SDK itself.
This selective approach results in smaller, faster-to-upload bundles that contain exactly the code needed to run your tasks, making it ideal for most development and production scenarios.

### `--copy_style all`

This comprehensive bundling strategy takes a directory-walking approach, recursively traversing your entire project directory and including every file it encounters.
Unlike the smart bundling that only includes imported Python modules, this method captures all project files regardless of whether they were imported during discovery.
This is particularly useful for projects that use dynamic imports, load configuration files or data assets at runtime, or have dependencies that aren't captured through normal Python import mechanisms.

### `--copy_style none`

This option completely skips code bundle creation, meaning no source code is packaged or uploaded to cloud storage.
When using this approach, you must provide an explicit version parameter since there's no code bundle to generate a version from.
This strategy is designed for scenarios where your code is already baked into custom container images, eliminating the need for separate code injection during task execution.
It results in the fastest deployment times but requires more complex image management workflows.

### `--root-dir` option

By default, Flyte uses your current working directory as the root for code bundling.
You can override this with `--root-dir` to specify a different base directory - particularly useful for monorepos or when deploying from subdirectories. This affects all copy styles: `loaded_modules` will look for imported modules relative to the root directory, `all` will walk the directory tree starting from the root, and the root directory setting works with any copy style. See the [Deploy command options](./deploy-command-options#--root-dir) for detailed usage examples.

After the code bundle is created (if applicable), it is uploaded to a cloud storage location (like S3 or GCS) accessible by your Flyte backend. It is now ready to be run.

## 5. Image building

If your `TaskEnvironment` specifies [custom images](https://www.union.ai/docs/v2/union/user-guide/task-configuration/container-images), Flyte builds and pushes container images before deploying tasks.
The build process varies based on your configuration and backend type:

### Local image building

When `image.builder` is set to `local` in [your `config.yaml`](https://www.union.ai/docs/v2/union/user-guide/connecting-to-a-cluster), images are built on your local machine using Docker. This approach:
- Requires Docker to be installed and running on your development machine
- Uses Docker BuildKit to build images from generated Dockerfiles or your custom Dockerfile
- Pushes built images to the container registry specified in your `Image` configuration
- Is the only option available for Flyte OSS instances

### Remote image building

When `image.builder` is set to `remote` in [your `config.yaml`](https://www.union.ai/docs/v2/union/user-guide/connecting-to-a-cluster), images are built on cloud infrastructure. This approach:
- Builds images using Union's ImageBuilder service (currently only available for Union backends, not OSS Flyte)
- Requires no local Docker installation or configuration
- Can push to Union's internal registry or external registries you specify
- Provides faster, more consistent builds by leveraging cloud resources

> [!NOTE]
> Remote building is currently exclusive to Union backends. OSS Flyte installations must use `local`

## Understanding option relationships

It's important to understand how the various deployment options work together.
The **discovery options** (`--recursive` and `--all`) operate independently of the **bundling options** (`--copy-style`),
giving you flexibility in how you structure your deployments.

Environment discovery determines which files Flyte will examine to find `TaskEnvironment` objects,
while code bundling controls what gets packaged and uploaded for execution.
You can freely combine these approaches.
For example, discovering environments recursively across your entire project while using smart bundling to include only the necessary code modules.

When multiple environments are discovered, they all share the same code bundle, which is efficient for related services or components that use common dependencies:

```bash
flyte deploy --recursive --copy-style loaded_modules ./project
```

> [!NOTE]
> All discovered environments share the same code bundle.

For a full overview of all deployment options, see [Deploy command options](https://www.union.ai/docs/v2/union/api-reference/flyte-cli/page.md).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/deploy-command-options ===

# Deploy command options

The `flyte deploy` command provides extensive configuration options:

**`flyte deploy [OPTIONS] <PATH> [TASK_ENV_VARIABLE]`**

| Option                      | Short | Type   | Default                   | Description                                       |
|-----------------------------|-------|--------|---------------------------|---------------------------------------------------|
| `--project`                 | `-p`  | text   | *from config*             | Project to deploy to                              |
| `--domain`                  | `-d`  | text   | *from config*             | Domain to deploy to                               |
| `--version`                 |       | text   | *auto-generated*          | Explicit version tag for deployment               |
| `--dry-run`/`--dryrun`      |       | flag   | `false`                   | Preview deployment without executing              |
| `--all`                     |       | flag   | `false`                   | Deploy all environments in specified path         |
| `--recursive`               | `-r`  | flag   | `false`                   | Deploy environments recursively in subdirectories |
| `--copy-style`              |       | choice | `loaded_modules|all|none` | Code bundling strategy                            |
| `--root-dir`                |       | path   | *current dir*             | Override source root directory                    |
| `--image`                   |       | text   |                           | Image URI mappings (format: `name=uri`)           |
| `--ignore-load-errors`      | `-i`  | flag   | `false`                   | Continue deployment despite module load failures  |
| `--no-sync-local-sys-paths` |       | flag   | `false`                   | Disable local `sys.path` synchronization          |

## `--project`, `--domain`

**`flyte deploy --domain <DOMAIN> --project <PROJECT> <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

You can specify `--project` and `--domain` which will override any defaults defined in your `config.yaml`:

```bash
flyte deploy my_example.py env
```

Specify a target project and domain:

```bash
flyte deploy --project my-project --domain development my_example.py env
```

## `--version`

**`flyte deploy --version <VERSION> <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--version` option controls how deployed tasks are tagged and identified in the Flyte backend:

Auto-generated version (default):

```bash
flyte deploy my_example.py env
```

Explicit version:

```bash
flyte deploy --version v1.0.0 my_example.py env
```

> [!NOTE]
> An explicit version is required when using `--copy-style none`, since there is no code bundle to generate a hash from.

```bash
flyte deploy --copy-style none --version v1.0.0 my_example.py env
```

### When versions are used

- **Explicit versioning**: Provides human-readable task identification (e.g., `v1.0.0`, `prod-2024-12-01`)
- **Auto-generated versions**: When no version is specified, Flyte creates an MD5 hash from the code bundle, environment configuration, and image cache
- **Version requirement**: `copy-style none` mandates explicit versions since there's no code bundle to hash
- **Task referencing**: Versions enable precise task references in `flyte run deployed-task` and workflow invocations

## `--dry-run`

**`flyte deploy --dry-run <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--dry-run` option allows you to preview what would be deployed without actually performing the deployment:

```bash
flyte deploy --dry-run my_example.py env
```

## `--all` and `--recursive`

**`flyte deploy --all <SOURCE_FILE>`**

**`flyte deploy --recursive <DIRECTORY_PATH>`**

Control which environments get discovered and deployed:

**Single environment (default):**

```bash
flyte deploy my_example.py env
```

**All environments in file:**

```bash
flyte deploy --all my_example.py
```

**Recursive directory deployment:**

```bash
flyte deploy --recursive ./src
```

Combine with comprehensive bundling:

```bash
flyte deploy --recursive --copy-style all ./project
```

## `--copy-style`

**`flyte deploy --copy_style [loaded_modules|all|none] <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--copy-style` option controls what gets packaged:

### `--copy-style loaded_modules` (default)

```bash
flyte deploy --copy-style loaded_modules my_example.py env
```

- **Includes**: Only imported Python modules from your project
- **Excludes**: Site-packages, system modules, Flyte SDK
- **Best for**: Most projects (optimal size and speed)

### `--copy-style all`

```bash
flyte deploy --copy-style all my_example.py env
```

- **Includes**: All files in project directory
- **Best for**: Projects with dynamic imports or data files

### `--copy-style none`

```bash
flyte deploy --copy-style none --version v1.0.0 my_example.py env
```

- **Requires**: Explicit version parameter
- **Best for**: Pre-built container images with baked-in code

## `--root-dir`

**`flyte deploy --root-dir <DIRECTORY> <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--root-dir` option overrides the default source directory that Flyte uses as the base for code bundling and import resolution.
This is particularly useful for monorepos and projects with complex directory structures.

### Default behavior (without `--root-dir`)

- Flyte uses the current working directory as the root
- Code bundling starts from this directory
- Import paths are resolved relative to this location

### Common use cases

**Monorepos:**
Deploy a service from the monorepo root:

```bash
flyte deploy --root-dir ./services/ml ./services/ml/my_example.py env
```

Deploy from anywhere in the monorepo:

```bash
cd ./docs/
flyte deploy --root-dir ../services/ml ../services/ml/my_example.py env
```

**Cross-directory imports:**
When a workflow imports modules from sibling directories (e.g., `project/workflows/my_example.py` imports `project/src/utils.py`):

```bash
cd project/workflows/
flyte deploy --root-dir .. my_example.py env
```

**Working directory independence:**
```bash
flyte deploy --root-dir /path/to/project /path/to/project/my_example.py env
```

### How it works

1. **Code bundling**: Files are collected starting from `--root-dir` instead of the current working directory
2. **Import resolution**: Python imports are resolved relative to the specified root directory
3. **Path consistency**: Ensures the same directory structure in local and remote execution environments
4. **Dependency packaging**: Captures all necessary modules that may be located outside the workflow file's immediate directory

### Example with complex project structure
```
my-project/
├── services/
│   ├── ml/
│   │   └── my_example.py     # imports shared.utils
│   └── api/
└── shared/
    └── utils.py
```

```bash
flyte deploy --root-dir ./my-project ./my-project/services/ml/my_example.py env
```

This ensures that both `services/ml/` and `shared/` directories are included in the code bundle, allowing the workflow to successfully import `shared.utils` during remote execution.

## `--image`

**`flyte deploy --image <IMAGE_MAPPING> <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--image` option allows you to override image URIs at deployment time without modifying your code. Format: `imagename=imageuri`

### Named image mappings

```bash
flyte deploy --image base=ghcr.io/org/base:v1.0 my_example.py env
```

Multiple named image mappings:

```bash
flyte deploy \
  --image base=ghcr.io/org/base:v1.0 \
  --image gpu=ghcr.io/org/gpu:v2.0 \
  my_example.py env
```

### Default image mapping

```bash
flyte deploy --image ghcr.io/org/default:latest my_example.py env
```

### How it works

- Named mappings (e.g., `base=URI`) override images created with `Image.from_ref_name("base")`.
- Unnamed mappings (e.g., just `URI`) override the default "auto" image.
- Multiple `--image` flags can be specified.
- Mappings are resolved during the image building phase of deployment.

## `--ignore-load-errors`

**`flyte deploy --ignore-load-errors <SOURCE_PATH> <TASK_ENV_VARIABLE>`**

The `--ignore-load-errors` option allows the deployment process to continue even if some modules fail to load during the environment discovery phase. This is particularly useful for large projects or monorepos where certain modules may have missing dependencies or other issues that prevent them from being imported successfully.

```bash
flyte deploy --recursive --ignore-load-errors ./large-project
```

## `--no-sync-local-sys-paths`

**`flyte deploy --no-sync-local-sys-paths <SOURCE_FILE> <TASK_ENV_VARIABLE>`**

The `--no-sync-local-sys-paths` option disables the automatic synchronization of local `sys.path` entries to the remote container environment. This is an advanced option for specific deployment scenarios.

### Default behavior (path synchronization enabled)

- Flyte captures local `sys.path` entries that are under the root directory
- These paths are passed to the remote container via the `_F_SYS_PATH` environment variable
- At runtime, the remote container adds these paths to its `sys.path`, maintaining the same import environment

### When to disable path synchronization

```bash
flyte deploy --no-sync-local-sys-paths my_example.py env
```

### Use cases for disabling

- **Custom container images**: When your container already has the correct `sys.path` configuration
- **Conflicting path structures**: When local development paths would interfere with container paths
- **Security concerns**: When you don't want to expose local development directory structures
- **Minimal environments**: When you want precise control over what gets added to the container's Python path

### How it works

- **Enabled (default)**: Local paths like `./my_project/utils` get synchronized and added to remote `sys.path`
- **Disabled**: Only the container's native `sys.path` is used, along with the deployed code bundle

Most users should leave path synchronization enabled unless they have specific requirements for container path isolation or are using pre-configured container environments.

## SDK deployment options

The core deployment functionality is available programmatically through the `flyte.deploy()` function, though some CLI-specific options are not applicable:

```python
import flyte

env = flyte.TaskEnvironment(name="my_env")

@env.task
async def process_data(data: str) -> str:
    return f"Processed: {data}"

if __name__ == "__main__":
    flyte.init_from_config()

    # Comprehensive deployment configuration
    deployment = flyte.deploy(
        env,                          # Environment to deploy
        dryrun=False,                 # Set to True for dry run
        version="v1.2.0",             # Explicit version tag
        copy_style="loaded_modules"   # Code bundling strategy
    )
    print(f"Deployment successful: {deployment[0].summary_repr()}")
```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/packaging ===

# Code packaging for remote execution

When you run Flyte tasks remotely, your code needs to be available in the execution environment. Flyte SDK provides two main approaches for packaging your code:

1. **Code bundling** - Bundle code dynamically at runtime
2. **Container-based deployment** - Embed code directly in container images

## Quick comparison

| Aspect | Code bundling | Container-based |
|--------|---------------|-----------------|
| **Speed** | Fast (no image rebuild) | Slower (requires image build) |
| **Best for** | Rapid development, iteration | Production, immutable deployments |
| **Code changes** | Immediate effect | Requires image rebuild |
| **Setup** | Automatic by default | Manual configuration needed |
| **Reproducibility** | Excellent (hash-based versioning) | Excellent (immutable images) |
| **Rollback** | Requires version control | Tag-based, straightforward |

---

## Code bundling

**Default approach** - Automatically bundles and uploads your code to remote storage at runtime.

### How it works

When you run `flyte run` or call `flyte.run()`, Flyte automatically:

1. **Scans loaded modules** from your codebase
2. **Creates a tarball** (gzipped, without timestamps for consistent hashing)
3. **Uploads to blob storage** (S3, GCS, Azure Blob)
4. **Deduplicates** based on content hashes
5. **Downloads in containers** at runtime

This process happens transparently - every container downloads and extracts the code bundle before execution.

> [!NOTE]
> Code bundling is optimized for speed:
> - Bundles are created without timestamps for consistent hashing
> - Identical code produces identical hashes, enabling deduplication
> - Only modified code triggers new uploads
> - Containers cache downloaded bundles
>
> **Reproducibility:** Flyte automatically versions code bundles based on content hash. The same code always produces the same hash, guaranteeing reproducibility without manual versioning. However, version control is still recommended for rollback capabilities.

### Automatic code bundling

**Default behavior** - Bundles all loaded modules automatically.

#### What gets bundled

Flyte includes modules that are:
- ✅ **Loaded when environment is parsed** (imported at module level)
- ✅ **Part of your codebase** (not system packages)
- ✅ **Within your project directory**
- ❌ **NOT lazily loaded** (imported inside functions)
- ❌ **NOT system-installed packages** (e.g., from site-packages)

#### Example: Basic automatic bundling

```python
# app.py
import flyte
from my_module import helper  # ✅ Bundled automatically

env = flyte.TaskEnvironment(
    name="default",
    image=flyte.Image.from_debian_base().with_pip_packages("pandas", "numpy")
)

@env.task
def process_data(x: int) -> int:
    # This import won't be bundled (lazy load)
    from another_module import util  # ❌ Not bundled automatically
    return helper.transform(x)

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(process_data, x=42)
    print(run.url)
```

When you run this:
```bash
flyte run app.py process_data --x 42
```

Flyte automatically:
1. Bundles `app.py` and `my_module.py`
2. Preserves the directory structure
3. Uploads to blob storage
4. Makes it available in the remote container

#### Project structure example

```
my_project/
├── app.py              # Main entry point
├── tasks/
│   ├── __init__.py
│   ├── data_tasks.py   # Flyte tasks
│   └── ml_tasks.py
└── utils/
    ├── __init__.py
    ├── preprocessing.py # Business logic
    └── models.py
```

```python
# app.py
import flyte
from tasks.data_tasks import load_data    # ✅ Bundled
from tasks.ml_tasks import train_model    # ✅ Bundled
# utils modules imported in tasks are also bundled

@flyte.task
def pipeline(dataset: str) -> float:
    data = load_data(dataset)
    accuracy = train_model(data)
    return accuracy

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(pipeline, dataset="train.csv")
```

**All modules are bundled with their directory structure preserved.**

### Manual code bundling

Control exactly what gets bundled by configuring the copy style.

#### Copy styles

Three options available:

1. **`"auto"`** (default) - Bundle loaded modules only
2. **`"all"`** - Bundle everything in the working directory
3. **`"none"`** - Skip bundling entirely (requires code in container)

#### Using `copy_style="all"`

Bundle all files under your project directory:

```python
import flyte

flyte.init_from_config()

# Bundle everything in current directory
run = flyte.with_runcontext(copy_style="all").run(
    my_task,
    input_data="sample.csv"
)
```

Or via CLI:
```bash
flyte run --copy-style=all app.py my_task --input-data sample.csv
```

**Use when:**
- You have data files or configuration that tasks need
- You use dynamic imports or lazy loading
- You want to ensure all project files are available

#### Using `copy_style="none"`

Skip code bundling (see **Run and deploy tasks > Code packaging for remote execution > Container-based deployment**):

```python
run = flyte.with_runcontext(copy_style="none").run(my_task, x=10)
```

### Controlling the root directory

The `root_dir` parameter controls which directory serves as the bundling root.

#### Why root directory matters

1. **Determines what gets bundled** - All code paths are relative to root_dir
2. **Preserves import structure** - Python imports must match the bundle structure
3. **Affects path resolution** - Files and modules are located relative to root_dir

#### Setting root directory

##### Via CLI

```bash
flyte run --root-dir /path/to/project app.py my_task
```

##### Programmatically

```python
import pathlib
import flyte

flyte.init_from_config(
    root_dir=pathlib.Path(__file__).parent
)
```

#### Root directory use cases

##### Use case 1: Multi-module project

```
project/
├── src/
│   ├── workflows/
│   │   └── pipeline.py
│   └── utils/
│       └── helpers.py
└── config.yaml
```

```python
# src/workflows/pipeline.py
import pathlib
import flyte
from utils.helpers import process  # Relative import from project root

# Set root to project root (not src/)
flyte.init_from_config(
    root_dir=pathlib.Path(__file__).parent.parent.parent
)

@flyte.task
def my_task():
    return process()
```

**Root set to `project/` so imports like `from utils.helpers` work correctly.**

##### Use case 2: Shared utilities

```
workspace/
├── shared/
│   └── common.py
└── project/
    └── app.py
```

```python
# project/app.py
import flyte
import pathlib
from shared.common import shared_function  # Import from parent directory

# Set root to workspace/ to include shared/
flyte.init_from_config(
    root_dir=pathlib.Path(__file__).parent.parent
)
```

##### Use case 3: Monorepo

```
monorepo/
├── libs/
│   ├── data/
│   └── models/
└── services/
    └── ml_service/
        └── workflows.py
```

```python
# services/ml_service/workflows.py
import flyte
import pathlib
from libs.data import loader  # Import from monorepo root
from libs.models import predictor

# Set root to monorepo/ to include libs/
flyte.init_from_config(
    root_dir=pathlib.Path(__file__).parent.parent.parent
)
```

#### Root directory best practices

1. **Set root_dir at project initialization** before importing any task modules
2. **Use absolute paths** with `pathlib.Path(__file__).parent` navigation
3. **Match your import structure** - if imports are relative to project root, set root_dir to project root
4. **Keep consistent** - use the same root_dir for both `flyte run` and `flyte.init()`

### Code bundling examples

#### Example: Standard Python package

```
my_package/
├── pyproject.toml
├── src/
│   └── my_package/
│       ├── __init__.py
│       ├── main.py
│       ├── data/
│       │   ├── loader.py
│       │   └── processor.py
│       └── models/
│           └── analyzer.py
```

```python
# src/my_package/main.py
import flyte
import pathlib
from my_package.data.loader import fetch_data
from my_package.data.processor import clean_data
from my_package.models.analyzer import analyze

env = flyte.TaskEnvironment(
    name="pipeline",
    image=flyte.Image.from_debian_base().with_uv_project(
        pyproject_file=pathlib.Path(__file__).parent.parent.parent / "pyproject.toml"
    )
)

@env.task
async def fetch_task(url: str) -> dict:
    return await fetch_data(url)

@env.task
def process_task(raw_data: dict) -> list[dict]:
    return clean_data(raw_data)

@env.task
def analyze_task(data: list[dict]) -> str:
    return analyze(data)

if __name__ == "__main__":
    import flyte.git

    # Set root to project root for proper imports
    flyte.init_from_config(
        flyte.git.config_from_root(),
        root_dir=pathlib.Path(__file__).parent.parent.parent
    )

    # All modules bundled automatically
    run = flyte.run(analyze_task, data=[{"value": 1}, {"value": 2}])
    print(f"Run URL: {run.url}")
```

**Run with:**
```bash
cd my_package
flyte run src/my_package/main.py analyze_task --data '[{"value": 1}]'
```

#### Example: Dynamic environment based on domain

```python
# environment_picker.py
import flyte

def create_env():
    """Create different environments based on domain."""
    if flyte.current_domain() == "development":
        return flyte.TaskEnvironment(
            name="dev",
            image=flyte.Image.from_debian_base(),
            env_vars={"ENV": "dev", "DEBUG": "true"}
        )
    elif flyte.current_domain() == "staging":
        return flyte.TaskEnvironment(
            name="staging",
            image=flyte.Image.from_debian_base(),
            env_vars={"ENV": "staging", "DEBUG": "false"}
        )
    else:  # production
        return flyte.TaskEnvironment(
            name="prod",
            image=flyte.Image.from_debian_base(),
            env_vars={"ENV": "production", "DEBUG": "false"},
            resources=flyte.Resources(cpu="2", memory="4Gi")
        )

env = create_env()

@env.task
async def process(n: int) -> int:
    import os
    print(f"Running in {os.getenv('ENV')} environment")
    return n * 2

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(process, n=5)
    print(run.url)
```

**Why this works:**
- `flyte.current_domain()` is set correctly when Flyte re-instantiates modules remotely
- Environment configuration is deterministic and reproducible
- Code automatically bundled with domain-specific settings

> [!NOTE]
> `flyte.current_domain()` only works after `flyte.init()` is called:
> - ✅ Works with `flyte run` and `flyte deploy` (auto-initialize)
> - ✅ Works in `if __name__ == "__main__"` after explicit `flyte.init()`
> - ❌ Does NOT work at module level without initialization

### When to use code bundling

✅ **Use code bundling when:**
- Rapid development and iteration
- Frequently changing code
- Multiple developers testing changes
- Jupyter notebook workflows
- Quick prototyping and experimentation

❌ **Consider container-based instead when:**
- Need easy rollback to previous versions (container tags are simpler than finding git commits)
- Working with air-gapped environments (no blob storage access)
- Code changes require coordinated dependency updates

---

## Container-based deployment

**Advanced approach** - Embed code directly in container images for immutable deployments.

### How it works

Instead of bundling code at runtime:

1. **Build container image** with code copied inside
2. **Disable code bundling** with `copy_style="none"`
3. **Container has everything** needed at runtime

**Trade-off:** Every code change requires a new image build (slower), but provides complete reproducibility.

### Configuration

Three key steps:

#### 1. Set `copy_style="none"`

Disable runtime code bundling:

```python
flyte.with_runcontext(copy_style="none").run(my_task, n=10)
```

Or via CLI:
```bash
flyte run --copy-style=none app.py my_task --n 10
```

#### 2. Copy Code into Image

Use `Image.with_source_file()` or `Image.with_source_folder()`:

```python
import pathlib
import flyte

env = flyte.TaskEnvironment(
    name="embedded",
    image=flyte.Image.from_debian_base().with_source_folder(
        src=pathlib.Path(__file__).parent,
        copy_contents_only=True
    )
)
```

#### 3. Set Correct `root_dir`

Match your image copy configuration:

```python
flyte.init_from_config(
    root_dir=pathlib.Path(__file__).parent
)
```

### Image source copying methods

#### `with_source_file()` - Copy individual files

Copy a single file into the container:

```python
image = flyte.Image.from_debian_base().with_source_file(
    src=pathlib.Path(__file__),
    dst="/app/main.py"
)
```

**Use for:**
- Single-file workflows
- Copying configuration files
- Adding scripts to existing images

#### `with_source_folder()` - Copy directories

Copy entire directories into the container:

```python
image = flyte.Image.from_debian_base().with_source_folder(
    src=pathlib.Path(__file__).parent,
    dst="/app",
    copy_contents_only=False  # Copy folder itself
)
```

**Parameters:**
- `src`: Source directory path
- `dst`: Destination path in container (optional, defaults to workdir)
- `copy_contents_only`: If `True`, copies folder contents; if `False`, copies folder itself

##### `copy_contents_only=True` (Recommended)

Copies only the contents of the source folder:

```python
# Project structure:
# my_project/
#   ├── app.py
#   └── utils.py

image = flyte.Image.from_debian_base().with_source_folder(
    src=pathlib.Path(__file__).parent,
    copy_contents_only=True
)

# Container will have:
# /app/app.py
# /app/utils.py

# Set root_dir to match:
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
```

##### `copy_contents_only=False`

Copies the folder itself with its name:

```python
# Project structure:
# workspace/
#   └── my_project/
#       ├── app.py
#       └── utils.py

image = flyte.Image.from_debian_base().with_source_folder(
    src=pathlib.Path(__file__).parent,  # Points to my_project/
    copy_contents_only=False
)

# Container will have:
# /app/my_project/app.py
# /app/my_project/utils.py

# Set root_dir to parent to match:
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)
```

### Complete container-based example

```python
# full_build.py
import pathlib
import flyte
from dep import helper  # Local module

# Configure environment with source copying
env = flyte.TaskEnvironment(
    name="full_build",
    image=flyte.Image.from_debian_base()
        .with_pip_packages("numpy", "pandas")
        .with_source_folder(
            src=pathlib.Path(__file__).parent,
            copy_contents_only=True
        )
)

@env.task
def square(x: int) -> int:
    return x ** helper.get_exponent()

@env.task
def main(n: int) -> list[int]:
    return list(flyte.map(square, range(n)))

if __name__ == "__main__":
    import flyte.git

    # Initialize with matching root_dir
    flyte.init_from_config(
        flyte.git.config_from_root(),
        root_dir=pathlib.Path(__file__).parent
    )

    # Run with copy_style="none" and explicit version
    run = flyte.with_runcontext(
        copy_style="none",
        version="v1.0.0"  # Explicit version for image tagging
    ).run(main, n=10)

    print(f"Run URL: {run.url}")
    run.wait()
```

**Project structure:**
```
project/
├── full_build.py
├── dep.py          # Local dependency
└── .flyte/
    └── config.yaml
```

**Run with:**
```bash
python full_build.py
```

This will:
1. Build a container image with `full_build.py` and `dep.py` embedded
2. Tag it as `v1.0.0`
3. Push to registry
4. Execute remotely without code bundling

### Using externally built images

When containers are built outside of Flyte (e.g., in CI/CD), use `Image.from_ref_name()`:

#### Step 1: Build your image externally

```dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Copy your code
COPY src/ /app/

# Install dependencies
RUN pip install flyte pandas numpy

# Ensure flyte executable is available
RUN flyte --help
```

Build and push the image:

```bash
docker build -t myregistry.com/my-app:v1.2.3 .
docker push myregistry.com/my-app:v1.2.3
```

#### Step 2: Reference image by name

```python
# app.py
import flyte

env = flyte.TaskEnvironment(
    name="external",
    image=flyte.Image.from_ref_name("my-app-image")  # Reference name
)

@env.task
def process(x: int) -> int:
    return x * 2

if __name__ == "__main__":
    flyte.init_from_config()

    # Pass actual image URI at deploy/run time
    run = flyte.with_runcontext(
        copy_style="none",
        images={"my-app-image": "myregistry.com/my-app:v1.2.3"}
    ).run(process, x=10)
```

Or via CLI:
```bash
flyte run \
  --copy-style=none \
  --image my-app-image=myregistry.com/my-app:v1.2.3 \
  app.py process --x 10
```

**For deployment:**
```bash
flyte deploy \
  --image my-app-image=myregistry.com/my-app:v1.2.3 \
  app.py
```

#### Why use reference names?

1. **Decouples code from image URIs** - Change images without modifying code
2. **Supports multiple environments** - Different images for dev/staging/prod
3. **Integrates with CI/CD** - Build images in pipelines, reference in code
4. **Enables image reuse** - Multiple tasks can reference the same image

#### Example: Multi-environment deployment

```python
import flyte
import os

# Code references image by name
env = flyte.TaskEnvironment(
    name="api",
    image=flyte.Image.from_ref_name("api-service")
)

@env.task
def api_call(endpoint: str) -> dict:
    # Implementation
    return {"status": "success"}

if __name__ == "__main__":
    flyte.init_from_config()

    # Determine image based on environment
    environment = os.getenv("ENV", "dev")
    image_uri = {
        "dev": "myregistry.com/api-service:dev",
        "staging": "myregistry.com/api-service:staging",
        "prod": "myregistry.com/api-service:v1.2.3"
    }[environment]

    run = flyte.with_runcontext(
        copy_style="none",
        images={"api-service": image_uri}
    ).run(api_call, endpoint="/health")
```

### Container-based best practices

1. **Always set explicit versions** when using `copy_style="none"`:
   ```python
   flyte.with_runcontext(copy_style="none", version="v1.0.0")
   ```

2. **Match `root_dir` to `copy_contents_only`**:
   - `copy_contents_only=True` → `root_dir=Path(__file__).parent`
   - `copy_contents_only=False` → `root_dir=Path(__file__).parent.parent`

3. **Ensure `flyte` executable is in container** - Add to PATH or install flyte package

4. **Use `.dockerignore`** to exclude unnecessary files:
   ```
   # .dockerignore
   __pycache__/
   *.pyc
   .git/
   .venv/
   *.egg-info/
   ```

5. **Test containers locally** before deploying:
   ```bash
   docker run -it myimage:latest /bin/bash
   python -c "import mymodule"  # Verify imports work
   ```

### When to use container-based deployment

✅ **Use container-based when:**
- Deploying to production
- Need immutable, reproducible environments
- Working with complex system dependencies
- Deploying to air-gapped or restricted environments
- CI/CD pipelines with automated builds
- Code changes are infrequent

❌ **Don't use container-based when:**
- Rapid development and frequent code changes
- Quick prototyping
- Interactive development (Jupyter notebooks)
- Learning and experimentation

---

## Choosing the right approach

### Decision tree

```
Are you iterating quickly on code?
├─ Yes → Use Code Bundling (Default)
│         (Development, prototyping, notebooks)
│         Both approaches are fully reproducible via hash/tag
└─ No  → Do you need easy version rollback?
          ├─ Yes → Use Container-based
          │         (Production, CI/CD, straightforward tag-based rollback)
          └─ No  → Either works
                    (Code bundling is simpler, container-based for air-gapped)
```

### Hybrid approach

You can use different approaches for different tasks:

```python
import flyte
import pathlib

# Fast iteration for development tasks
dev_env = flyte.TaskEnvironment(
    name="dev",
    image=flyte.Image.from_debian_base().with_pip_packages("pandas")
    # Code bundling (default)
)

# Immutable containers for production tasks
prod_env = flyte.TaskEnvironment(
    name="prod",
    image=flyte.Image.from_debian_base()
        .with_pip_packages("pandas")
        .with_source_folder(pathlib.Path(__file__).parent, copy_contents_only=True)
    # Requires copy_style="none"
)

@dev_env.task
def experimental_task(x: int) -> int:
    # Rapid development with code bundling
    return x * 2

@prod_env.task
def stable_task(x: int) -> int:
    # Production with embedded code
    return x ** 2

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)

    # Use code bundling for dev task
    dev_run = flyte.run(experimental_task, x=5)

    # Use container-based for prod task
    prod_run = flyte.with_runcontext(
        copy_style="none",
        version="v1.0.0"
    ).run(stable_task, x=5)
```

---

## Troubleshooting

### Import errors

**Problem:** `ModuleNotFoundError` when task executes remotely

**Solutions:**

1. **Check loaded modules** - Ensure modules are imported at module level:
   ```python
   # ✅ Good - bundled automatically
   from mymodule import helper

   @flyte.task
   def my_task():
       return helper.process()
   ```

   ```python
   # ❌ Bad - not bundled (lazy load)
   @flyte.task
   def my_task():
       from mymodule import helper
       return helper.process()
   ```

2. **Verify `root_dir`** matches your import structure:
   ```python
   # If imports are: from mypackage.utils import foo
   # Then root_dir should be parent of mypackage/
   flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)
   ```

3. **Use `copy_style="all"`** to bundle everything:
   ```bash
   flyte run --copy-style=all app.py my_task
   ```

### Code changes not reflected

**Problem:** Remote execution uses old code despite local changes

> [!NOTE]
> This is rare with code bundling - Flyte automatically versions based on content hash, so code changes should be detected automatically. This issue typically occurs with caching problems or when using `copy_style="none"`.

**Solutions:**

1. **Use explicit version bump** (mainly for container-based deployments):
   ```python
   run = flyte.with_runcontext(version="v2").run(my_task)
   ```

2. **Check if `copy_style="none"`** is set - this requires image rebuild:
   ```python
   # If using copy_style="none", rebuild image
   run = flyte.with_runcontext(
       copy_style="none",
       version="v2"  # Bump version to force rebuild
   ).run(my_task)
   ```

### Files missing in container

**Problem:** Task can't find data files or configs

**Solutions:**

1. **Use `copy_style="all"`** to bundle all files:
   ```bash
   flyte run --copy-style=all app.py my_task
   ```

2. **Copy files explicitly in image**:
   ```python
   image = flyte.Image.from_debian_base().with_source_file(
       src=pathlib.Path("config.yaml"),
       dst="/app/config.yaml"
   )
   ```

3. **Store data in remote storage** instead of bundling:
   ```python
   @flyte.task
   def my_task():
       # Read from S3/GCS instead of local files
       import flyte.io
       data = flyte.io.File("s3://bucket/data.csv").open().read()
   ```

### Container build failures

**Problem:** Image build fails with `copy_style="none"`

**Solutions:**

1. **Check `root_dir` matches `copy_contents_only`**:
   ```python
   # copy_contents_only=True
   image = Image.from_debian_base().with_source_folder(
       src=Path(__file__).parent,
       copy_contents_only=True
   )
   flyte.init(root_dir=Path(__file__).parent)  # Match!
   ```

2. **Ensure `flyte` executable available**:
   ```python
   image = Image.from_debian_base()  # Has flyte pre-installed
   ```

3. **Check file permissions** in source directory:
   ```bash
   chmod -R +r project/
   ```

### Version conflicts

**Problem:** Multiple versions of same image causing confusion

**Solutions:**

1. **Use explicit versions**:
   ```python
   run = flyte.with_runcontext(
       copy_style="none",
       version="v1.2.3"  # Explicit, not auto-generated
   ).run(my_task)
   ```

2. **Clean old images**:
   ```bash
   docker image prune -a
   ```

3. **Use semantic versioning** for clarity:
   ```python
   version = "v1.0.0"  # Major.Minor.Patch
   ```

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/invoke-webhook ===

# Running Tasks via Webhooks

On Union, you can deploy apps (see [Apps documentation](../build-apps/_index)) that can run any deployed Flyte tasks. These apps can be REST API services, like FastAPI, that accept HTTP requests and run tasks on behalf of the caller.

A key feature of this approach is **passthrough authentication** - the app can carry forward the identity of the caller and use their credentials to run the task. This ensures proper authorization and audit trails, as tasks are executed with the permissions of the actual user making the request.

## How passthrough authentication works

When you deploy a webhook service on Union:

1. The caller sends an HTTP request with their authentication token (typically in the `Authorization` header)
2. Your webhook app extracts the authentication headers from the request
3. The app forwards these headers to the Flyte control plane when running the task
4. The task executes with the caller's identity and permissions

This is different from using a service API key, where all tasks would run with the same service account permissions regardless of who made the request.

## Setting up passthrough authentication

### Initialize with `flyte.init_passthrough()`

To enable passthrough authentication, initialize your app using `flyte.init_passthrough()`:

```python
import flyte

# Initialize Flyte with passthrough authentication
await flyte.init_passthrough.aio(
    endpoint="dns:///your-endpoint.hosted.unionai.cloud",
    project="my-project",      # Optional: default project
    domain="development",       # Optional: default domain
)
```

The `init_passthrough()` function configures the Flyte SDK to accept authentication metadata from the request context rather than using a static token or interactive authentication flow.

**Parameters:**

- `endpoint`: **Required**. The Flyte control plane endpoint URL
- `project`: Optional. Default project to use if not specified per request
- `domain`: Optional. Default domain to use if not specified per request
- `org`: Optional. Organization name
- `insecure`: Optional. Whether to use an insecure connection (default: `False`)

> [!IMPORTANT]
> The `endpoint` parameter is required when using passthrough authentication. Unlike other authentication modes, passthrough cannot infer the endpoint from environment variables or config files since it needs explicit initialization.

### Passing authentication metadata

Once initialized, you need to provide the caller's authentication headers when making requests to the Flyte control plane. There are two approaches:

#### Option 1: Using FastAPI middleware (recommended if using fastapi)

For FastAPI applications, Flyte provides a convenient middleware that automatically extracts authentication headers from incoming requests and sets them in the Flyte context:

```python
from fastapi import FastAPI
from flyte.app.extras import FastAPIPassthroughAuthMiddleware

app = FastAPI()

# Add the middleware - automatically handles auth for all endpoints
app.add_middleware(
    FastAPIPassthroughAuthMiddleware,
    excluded_paths={"/health"}  # Optional: skip auth for specific paths
)

@app.post("/run-task")
async def run_task():
    # No need to manually extract headers!
    # The middleware automatically sets auth context
    task = remote.Task.get(project="my-project", domain="development", name="my_task")
    run = await flyte.run.aio(task, x=42)
    return {"run_url": run.url}
```

**Middleware features:**

- **Automatic header extraction**: Extracts `Authorization` and `Cookie` headers by default
- **Path exclusions**: Skip auth for specific endpoints like `/health` or `/metrics`
- **Custom extractors**: Add custom header extraction logic
- **Thread-safe**: Properly isolates authentication per request using context variables

**Middleware parameters:**

- `excluded_paths`: Set of URL paths that bypass authentication extraction
- `header_extractors`: Custom list of header extractor functions (optional)

**Custom header extractors:**

```python
from flyte.app.extras import FastAPIPassthroughAuthMiddleware

app.add_middleware(
    FastAPIPassthroughAuthMiddleware,
    header_extractors=[
        FastAPIPassthroughAuthMiddleware.extract_authorization_header,
        FastAPIPassthroughAuthMiddleware.extract_custom_header("x-api-key"),
    ],
    excluded_paths={"/health", "/metrics"},
)
```

#### Option 2: Using the `auth_metadata()` context manager (any script, web serving framework)

The `flyte.remote.auth_metadata()` context manager allows you to explicitly set authentication headers for a block of code:

```python
import flyte.remote as remote

@app.post("/run-task")
async def run_task(request: Request):
    # Extract authentication from the request
    auth_header = request.headers.get("authorization")

    # Use auth_metadata to forward the caller's credentials
    with remote.auth_metadata(("authorization", auth_header)):
        # Get and run the task with the caller's identity
        task = remote.Task.get(project="my-project", domain="development", name="my_task")
        run = await flyte.run.aio(task, x=42)
        return {"run_url": run.url}
```

The `auth_metadata()` context manager accepts one or more tuples of `(header_name, header_value)`:

```python
with remote.auth_metadata(
    ("authorization", auth_header),
    ("cookie", cookie_header),
):
    # All Flyte API calls within this block use these headers
    ...
```

## Complete example

Here's a complete FastAPI webhook service that runs Flyte tasks with passthrough authentication:

```python
import os
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from starlette import status

import flyte
import flyte.errors
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment, FastAPIPassthroughAuthMiddleware

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte with passthrough auth on startup."""
    endpoint = os.getenv("FLYTE_ENDPOINT")
    if not endpoint:
        raise RuntimeError("FLYTE_ENDPOINT environment variable not set")

    await flyte.init_passthrough.aio(
        endpoint=endpoint,
        project=os.getenv("FLYTE_INTERNAL_EXECUTION_PROJECT"),
        domain=os.getenv("FLYTE_INTERNAL_EXECUTION_DOMAIN"),
    )
    yield

app = FastAPI(
    title="Flyte Webhook Runner",
    description="A webhook service that runs Flyte tasks",
    lifespan=lifespan,
)

# Add passthrough auth middleware
app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"})

@app.get("/health")
async def health_check():
    """Health check endpoint (no auth required)."""
    return {"status": "healthy"}

@app.get("/me")
async def get_current_user():
    """Get information about the authenticated user."""
    user = await remote.User.get.aio()
    return {
        "subject": user.subject(),
        "name": user.name(),
    }

@app.post("/run-task/{project}/{domain}/{name}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    inputs: dict,
    version: str | None = None,
):
    """
    Run a Flyte task with the caller's credentials.

    Args:
        project: Flyte project name
        domain: Flyte domain (e.g., development, staging, production)
        name: Task name
        inputs: Dictionary of input parameters for the task
        version: Task version (optional, defaults to "latest")

    Returns:
        Dictionary containing the run information
    """
    try:
        # Get the task
        task = remote.Task.get(
            project=project,
            domain=domain,
            name=name,
            version=version,
            auto_version="latest" if version is None else None,
        )

        # Run the task with the caller's identity
        run = await flyte.run.aio(task, **inputs)

        return {"url": run.url, "name": run.name}

    except flyte.errors.RemoteTaskNotFoundError:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Task {name} with {version} in {project} and {domain} not found",
        )
    except flyte.errors.RemoteTaskUsageError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=str(e),
        )

# Configure the app deployment
image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn")

app_env = FastAPIAppEnvironment(
    name="webhook-runner",
    app=app,
    description="A webhook service that runs Flyte tasks with passthrough auth",
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,  # Platform handles auth at gateway
    env_vars={
        "FLYTE_ENDPOINT": "your-endpoint.hosted.unionai.cloud",
    },
)
```

For a complete working example, see [`examples/apps/run_webhook.py`](https://github.com/unionai/flyte-sdk/blob/main/examples/apps/run_webhook.py) in the Flyte SDK repository.

## Calling the webhook

Once deployed, you can call your webhook using standard HTTP tools:

Get your API key:

```bash
flyte get api-key my-webhook-key
```

Call the webhook to run a task:

```bash
curl -X POST \
  -H "Authorization: Bearer <your-api-key>" \
  -H "Content-Type: application/json" \
  -d '{"x": 42, "y": "hello"}' \
  https://your-app.apps.unionai.cloud/run-task/my-project/development/my_task
```

The task will execute with the permissions associated with the API key used in the request.

## Best practices

1. **Always set an endpoint**: The `endpoint` parameter is required for `init_passthrough()`

2. **Use middleware for FastAPI**: The `FastAPIPassthroughAuthMiddleware` eliminates boilerplate and ensures consistent auth handling

3. **Exclude public endpoints**: Use `excluded_paths` to skip auth for health checks and public endpoints

4. **Set default project/domain**: If most requests target the same project/domain, set them during initialization to simplify your endpoint handlers

5. **Handle errors gracefully**: Catch `flyte.errors.RemoteTaskNotFoundError` or `flyte.errors.RemoteTaskUsageError` and other exceptions to return appropriate HTTP status codes

6. **Validate inputs**: Always validate task inputs before passing them to `flyte.run()`

7. **Use the caller's identity**: Passthrough auth ensures proper authorization and audit trails - avoid using static service credentials when possible

## Troubleshooting

### "FLYTE_ENDPOINT environment variable not set"

Ensure you set the `FLYTE_ENDPOINT` environment variable in your app configuration, or pass it explicitly to `init_passthrough()`.

### "Authentication credentials required"

The middleware returns this error when no authentication headers are found. Ensure:
- The client includes an `Authorization` header with a valid token
- The endpoint is not in the `excluded_paths` set
- Header extractors are configured correctly

### "Task not found"

Verify:
- The task exists in the specified project/domain
- The task name is correct (use the fully qualified name: `package.module.task_name`)
- The caller has permission to view the task

### Tasks run with wrong permissions

If tasks aren't respecting the caller's permissions:
- Verify `init_passthrough()` is called with `auth_type="Passthrough"`
- Ensure auth headers are being extracted and forwarded correctly
- Check that the middleware is added before route handlers

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/deployment-patterns ===

# Deployment patterns

Once you understand the basics of task deployment, you can leverage various deployment patterns to handle different project structures, dependency management approaches, and deployment requirements. This section covers the most common patterns with practical examples.

## Overview of deployment patterns

Flyte supports multiple deployment patterns to accommodate different project structures and requirements:

1. ****Run and deploy tasks > Deployment patterns > Simple file deployment**** - Single file with tasks and environments
2. ****Run and deploy tasks > Deployment patterns > Custom Dockerfile deployment**** - Full control over container environment
3. ****Run and deploy tasks > Deployment patterns > PyProject package deployment**** - Structured Python packages with dependencies and async tasks
4. ****Run and deploy tasks > Deployment patterns > Package structure deployment**** - Organized packages with shared environments
5. ****Run and deploy tasks > Deployment patterns > Full build deployment**** - Complete code embedding in containers
6. ****Run and deploy tasks > Deployment patterns > Python path deployment**** - Multi-directory project structures
7. ****Run and deploy tasks > Deployment patterns > Dynamic environment deployment**** - Environment selection based on domain context

Each pattern serves specific use cases and can be combined as needed for complex projects.

## Simple file deployment

The simplest deployment pattern involves defining both your tasks and task environment in a single Python file. This pattern works well for:

- Prototyping and experimentation
- Simple tasks with minimal dependencies
- Educational examples and tutorials

### Example structure

```python
import flyte

env = flyte.TaskEnvironment(name="simple_env")

@env.task
async def my_task(name: str) -> str:
    return f"Hello, {name}!"

if __name__ == "__main__":
    flyte.init_from_config()
    flyte.deploy(env)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/simple_file.py*

### Deployment commands

Deploy the environment:

```bash
flyte deploy my_example.py env
```

Run the task ephemerally:

```bash
flyte run my_example.py my_task --name "World"
```

### When to use

- Quick prototypes and experiments
- Single-purpose scripts
- Learning Flyte basics
- Tasks with no external dependencies

## Custom Dockerfile deployment

When you need full control over the container environment, you can specify a custom Dockerfile. This pattern is ideal for:

- Complex system dependencies
- Specific OS or runtime requirements
- Custom base images
- Multi-stage builds

### Example structure

```dockerfile
# syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.8 as uv
FROM python:3.12-slim-bookworm

USER root

# Copy in uv so that later commands don't have to mount it in
COPY --from=uv /uv /usr/bin/uv

# Configure default envs
ENV UV_COMPILE_BYTECODE=1 \
    UV_LINK_MODE=copy \
    VIRTUALENV=/opt/venv \
    UV_PYTHON=/opt/venv/bin/python \
    PATH="/opt/venv/bin:$PATH"

# Create a virtualenv with the user specified python version
RUN uv venv /opt/venv --python=3.12

WORKDIR /root

# Install dependencies
COPY requirements.txt .
RUN uv pip install --pre -r /root/requirements.txt
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dockerfile/Dockerfile*

```python
from pathlib import Path

import flyte

env = flyte.TaskEnvironment(
    name="docker_env",
    image=flyte.Image.from_dockerfile(
        # relative paths in python change based on where you call, so set it relative to this file
        Path(__file__).parent / "Dockerfile",
        registry="ghcr.io/flyteorg",
        name="docker_env_image",
    ),
)

@env.task
def main(x: int) -> int:
    return x * 2

if __name__ == "__main__":
    import flyte.git

    flyte.init_from_config(flyte.git.config_from_root())

    run = flyte.run(main, x=10)
    print(run.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dockerfile/dockerfile_env.py*

### Alternative: Dockerfile in different directory

You can also reference Dockerfiles from subdirectories:

```python
from pathlib import Path

import flyte

env = flyte.TaskEnvironment(
    name="docker_env_in_dir",
    image=flyte.Image.from_dockerfile(
        # relative paths in python change based on where you call, so set it relative to this file
        Path(__file__).parent.parent / "Dockerfile.workdir",
        registry="ghcr.io/flyteorg",
        name="docker_env_image",
    ),
)

@env.task
def main(x: int) -> int:
    return x * 2

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main, x=10)
    print(run.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dockerfile/src/docker_env_in_dir.py*

```dockerfile
# syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.8 as uv
FROM python:3.12-slim-bookworm

USER root

# Copy in uv so that later commands don't have to mount it in
COPY --from=uv /uv /usr/bin/uv

# Configure default envs
ENV UV_COMPILE_BYTECODE=1 \
    UV_LINK_MODE=copy \
    VIRTUALENV=/opt/venv \
    UV_PYTHON=/opt/venv/bin/python \
    PATH="/opt/venv/bin:$PATH"

# Create a virtualenv with the user specified python version
RUN uv venv /opt/venv --python=3.12

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN uv pip install --pre -r /app/requirements.txt
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dockerfile/Dockerfile.workdir*

### Key considerations

- **Path handling**: Use `Path(__file__).parent` for relative Dockerfile paths
  ```python
  # relative paths in python change based on where you call, so set it relative to this file
  Path(__file__).parent / "Dockerfile"
  ```
- **Registry configuration**: Specify a registry for image storage
- **Build context**: The directory containing the Dockerfile becomes the build context
- **Flyte installation**: Ensure Flyte is installed in the container and available on `$PATH`
  ```dockerfile
  # Install Flyte in your Dockerfile
  RUN pip install flyte
  ```
- **Dependencies**: Include all application requirements in the Dockerfile or requirements.txt

### When to use

- Need specific system packages or tools
- Custom base image requirements
- Complex installation procedures
- Multi-stage build optimization

## PyProject package deployment

For structured Python projects with proper package management, use the PyProject pattern. This approach demonstrates a **realistic Python project structure** that provides:

- Proper dependency management with `pyproject.toml` and external packages like `httpx`
- Clean separation of business logic and Flyte tasks across multiple modules
- Professional project structure with `src/` layout
- Async task execution with API calls and data processing
- Entrypoint patterns for both command-line and programmatic execution

### Example structure

```
pyproject_package/
├── pyproject.toml          # Project metadata and dependencies
├── README.md              # Documentation
└── src/
    └── pyproject_package/
        ├── __init__.py     # Package initialization
        ├── main.py         # Entrypoint script
        ├── data/
        │   ├── __init__.py
        │   ├── loader.py   # Data loading utilities (no Flyte)
        │   └── processor.py # Data processing utilities (no Flyte)
        ├── models/
        │   ├── __init__.py
        │   └── analyzer.py # Analysis utilities (no Flyte)
        └── tasks/
            ├── __init__.py
            └── tasks.py    # Flyte task definitions
```

### Business logic modules

The business logic is completely separate from Flyte and can be used independently:

#### Data Loading (`data/loader.py`)
```python
import json
from pathlib import Path
from typing import Any

import httpx

async def fetch_data_from_api(url: str) -> list[dict[str, Any]]:
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

def load_local_data(file_path: str | Path) -> dict[str, Any]:
    path = Path(file_path)

    if not path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    with path.open("r") as f:
        return json.load(f)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/src/pyproject_package/data/loader.py*

#### Data Processing (`data/processor.py`)
```python
import asyncio
from typing import Any

from pydantic import BaseModel, Field, field_validator

class DataItem(BaseModel):
    id: int = Field(gt=0, description="Item ID must be positive")
    value: float = Field(description="Item value")
    category: str = Field(min_length=1, description="Item category")

    @field_validator("category")
    @classmethod
    def category_must_be_lowercase(cls, v: str) -> str:
        return v.lower()

def clean_data(raw_data: dict[str, Any]) -> dict[str, Any]:
    # Remove None values
    cleaned = {k: v for k, v in raw_data.items() if v is not None}

    # Validate items if present
    if "items" in cleaned:
        validated_items = []
        for item in cleaned["items"]:
            try:
                validated = DataItem(**item)
                validated_items.append(validated.model_dump())
            except Exception as e:
                print(f"Skipping invalid item {item}: {e}")
                continue
        cleaned["items"] = validated_items

    return cleaned

def transform_data(data: dict[str, Any]) -> list[dict[str, Any]]:
    items = data.get("items", [])

    # Add computed fields
    transformed = []
    for item in items:
        transformed_item = {
            **item,
            "value_squared": item["value"] ** 2,
            "category_upper": item["category"].upper(),
        }
        transformed.append(transformed_item)

    return transformed

async def aggregate_data(items: list[dict[str, Any]]) -> dict[str, Any]:
    # Simulate async processing
    await asyncio.sleep(0.1)

    aggregated: dict[str, dict[str, Any]] = {}

    for item in items:
        category = item["category"]

        if category not in aggregated:
            aggregated[category] = {
                "count": 0,
                "total_value": 0.0,
                "values": [],
            }

        aggregated[category]["count"] += 1
        aggregated[category]["total_value"] += item["value"]
        aggregated[category]["values"].append(item["value"])

    # Calculate averages
    for category, v in aggregated.items():
        total = v["total_value"]
        count = v["count"]
        v["average_value"] = total / count if count > 0 else 0.0

    return {"categories": aggregated, "total_items": len(items)}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/src/pyproject_package/data/processor.py*

#### Analysis (`models/analyzer.py`)
```python
from typing import Any

import numpy as np

def calculate_statistics(data: list[dict[str, Any]]) -> dict[str, Any]:
    if not data:
        return {
            "count": 0,
            "mean": 0.0,
            "median": 0.0,
            "std_dev": 0.0,
            "min": 0.0,
            "max": 0.0,
        }

    values = np.array([item["value"] for item in data])

    stats = {
        "count": len(values),
        "mean": float(np.mean(values)),
        "median": float(np.median(values)),
        "std_dev": float(np.std(values)),
        "min": float(np.min(values)),
        "max": float(np.max(values)),
        "percentile_25": float(np.percentile(values, 25)),
        "percentile_75": float(np.percentile(values, 75)),
    }

    return stats

def generate_report(stats: dict[str, Any]) -> str:
    report_lines = [
        "=" * 60,
        "DATA ANALYSIS REPORT",
        "=" * 60,
    ]

    # Basic statistics section
    if "basic" in stats:
        basic = stats["basic"]
        report_lines.extend(
            [
                "",
                "BASIC STATISTICS:",
                f"  Count:       {basic.get('count', 0)}",
                f"  Mean:        {basic.get('mean', 0.0):.2f}",
                f"  Median:      {basic.get('median', 0.0):.2f}",
                f"  Std Dev:     {basic.get('std_dev', 0.0):.2f}",
                f"  Min:         {basic.get('min', 0.0):.2f}",
                f"  Max:         {basic.get('max', 0.0):.2f}",
                f"  25th %ile:   {basic.get('percentile_25', 0.0):.2f}",
                f"  75th %ile:   {basic.get('percentile_75', 0.0):.2f}",
            ]
        )

    # Category aggregations section
    if "aggregated" in stats and "categories" in stats["aggregated"]:
        categories = stats["aggregated"]["categories"]
        total_items = stats["aggregated"].get("total_items", 0)

        report_lines.extend(
            [
                "",
                "CATEGORY BREAKDOWN:",
                f"  Total Items: {total_items}",
                "",
            ]
        )

        for category, cat_stats in sorted(categories.items()):
            report_lines.extend(
                [
                    f"  Category: {category.upper()}",
                    f"    Count:         {cat_stats.get('count', 0)}",
                    f"    Total Value:   {cat_stats.get('total_value', 0.0):.2f}",
                    f"    Average Value: {cat_stats.get('average_value', 0.0):.2f}",
                    "",
                ]
            )

    report_lines.append("=" * 60)

    return "\n".join(report_lines)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/src/pyproject_package/models/analyzer.py*

These modules demonstrate:
- **No Flyte dependencies** - can be tested and used independently
- **Pydantic models** for data validation with custom validators
- **Async patterns** with proper context managers and error handling
- **NumPy integration** for statistical calculations
- **Professional error handling** with timeouts and validation

### Flyte orchestration layer

The Flyte tasks orchestrate the business logic with proper async execution:

```python
import pathlib
from typing import Any

import flyte
from pyproject_package.data import loader, processor
from pyproject_package.models import analyzer

UV_PROJECT_ROOT = pathlib.Path(__file__).parent.parent.parent.parent

env = flyte.TaskEnvironment(
    name="data_pipeline",
    image=flyte.Image.from_debian_base().with_uv_project(pyproject_file=UV_PROJECT_ROOT / "pyproject.toml"),
    resources=flyte.Resources(memory="512Mi", cpu="500m"),
)

@env.task
async def fetch_task(url: str) -> list[dict[str, Any]]:
    print(f"Fetching data from: {url}")
    data = await loader.fetch_data_from_api(url)
    print(f"Fetched {len(data)} top-level keys")
    return data

@env.task
async def process_task(raw_data: dict[str, Any]) -> list[dict[str, Any]]:
    print("Cleaning data...")
    cleaned = processor.clean_data(raw_data)

    print("Transforming data...")
    transformed = processor.transform_data(cleaned)

    print(f"Processed {len(transformed)} items")
    return transformed

@env.task
async def analyze_task(processed_data: list[dict[str, Any]]) -> str:
    print("Aggregating data...")
    aggregated = await processor.aggregate_data(processed_data)

    print("Calculating statistics...")
    stats = analyzer.calculate_statistics(processed_data)

    print("Generating report...")
    report = analyzer.generate_report({"basic": stats, "aggregated": aggregated})

    print("\n" + report)
    return report

@env.task
async def pipeline(api_url: str) -> str:
    # Chain tasks together
    raw_data = await fetch_task(url=api_url)
    processed_data = await process_task(raw_data=raw_data[0])
    report = await analyze_task(processed_data=processed_data)

    return report
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/src/pyproject_package/tasks/tasks.py*

### Entrypoint configuration

The main entrypoint demonstrates proper initialization and execution patterns:

```python
import pathlib

import flyte
from pyproject_package.tasks.tasks import pipeline

def main():
    # Initialize Flyte connection
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)

    # Example API URL with mock data
    # In a real scenario, this would be a real API endpoint
    example_url = "https://jsonplaceholder.typicode.com/posts"

    # For demonstration, we'll use mock data instead of the actual API
    # to ensure the example works reliably
    print("Starting data pipeline...")
    print(f"Target API: {example_url}")

    # To run remotely, uncomment the following:
    run = flyte.run(pipeline, api_url=example_url)
    print(f"\nRun Name: {run.name}")
    print(f"Run URL: {run.url}")
    run.wait()

if __name__ == "__main__":
    main()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/src/pyproject_package/main.py*

### Dependencies and configuration

```toml
[project]
name = "pyproject-package"
version = "0.1.0"
description = "Example Python package with Flyte tasks and modular business logic"
readme = "README.md"
authors = [
    { name = "Ketan Umare", email = "kumare3@users.noreply.github.com" }
]
requires-python = ">=3.10"
dependencies = [
    "flyte>=2.0.0b52",
    "httpx>=0.27.0",
    "numpy>=1.26.0",
    "pydantic>=2.0.0",
]

[project.scripts]
run-pipeline = "pyproject_package.main:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pyproject_package/pyproject.toml*

### Key features

- **Async task chains**: Tasks can be chained together with proper async/await patterns
- **External dependencies**: Demonstrates integration with external libraries (`httpx`, `pyyaml`)
- **uv integration**: Uses `.with_uv_project()` for dependency management
- **Resource specification**: Shows how to set memory and CPU requirements
- **Proper error handling**: Includes timeout and error handling in API calls

### Key learning points

1. **Separation of concerns**: Business logic (`data/`, `models/`) separate from orchestration (`main.py`)
2. **Reusable code**: Non-Flyte modules can be tested independently and reused
3. **Async support**: Demonstrates async Flyte tasks for I/O-bound operations
4. **Dependency management**: Shows how external packages integrate with Flyte
5. **Realistic structure**: Mirrors real-world Python project organization
6. **Entrypoint script**: Shows how to create runnable entry points

### Usage patterns

**Run locally:**
```bash
python -m pyproject_package.main
```

**Deploy to Flyte:**
```bash
flyte deploy .
```

**Run remotely:**
```bash
python -m pyproject_package.main  # Uses remote execution
```

### What this example demonstrates

- Multiple files and modules in a package
- Async Flyte tasks with external API calls
- Separation of business logic from orchestration
- External dependencies (`httpx`, `numpy`, `pydantic`)
- **Data validation with Pydantic models** for robust data processing
- **Professional error handling** with try/catch for data validation
- **Timeout configuration** for external API calls (`timeout=10.0`)
- **Async context managers** for proper resource management (`async with httpx.AsyncClient()`)
- Entrypoint script pattern with `project.scripts`
- Realistic project structure with `src/` layout
- Task chaining and data flow
- How non-Flyte code integrates with Flyte tasks

### When to use

- Production-ready, maintainable projects
- Projects requiring external API integration
- Complex data processing pipelines
- Team development with proper separation of concerns
- Applications needing async execution patterns

## Package structure deployment

For organizing Flyte workflows in a package structure with shared task environments and utilities, use this pattern. It's particularly useful for:

- Multiple workflows that share common environments and utilities
- Organized code structure with clear module boundaries
- Projects where you want to reuse task environments across workflows

### Example structure

```
lib/
├── __init__.py
└── workflows/
    ├── __init__.py
    ├── workflow1.py    # First workflow
    ├── workflow2.py    # Second workflow
    ├── env.py          # Shared task environment
    └── utils.py        # Shared utilities
```

### Key concepts

- **Shared environments**: Define task environments in `env.py` and import across workflows
- **Utility modules**: Common functions and utilities shared between workflows
- **Root directory handling**: Use `--root-dir` flag for proper Python path configuration

### Running with root directory

When running workflows with a package structure, specify the root directory:

```bash
flyte run --root-dir . lib/workflows/workflow1.py process_workflow
flyte run --root-dir . lib/workflows/workflow2.py math_workflow --n 6
```

### How `--root-dir` works

The `--root-dir` flag automatically configures the Python path (`sys.path`) to ensure:

1. **Local execution**: Package imports work correctly when running locally
2. **Consistent behavior**: Same Python path configuration locally and at runtime
3. **No manual PYTHONPATH**: Eliminates need to manually export environment variables
4. **Runtime packaging**: Flyte packages and copies code correctly to execution environment
5. **Runtime consistency**: The same package structure is preserved in the runtime container

### Alternative: Using a Python project

For larger projects, create a proper Python project with `pyproject.toml`:

```toml
# pyproject.toml
[project]
name = "lib"
version = "0.1.0"

[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
```

Then install in editable mode:

```bash
pip install -e .
```

After installation, you can run workflows without `--root-dir`:

```bash
flyte run lib/workflows/workflow1.py process_workflow
```

However, for deployment and remote execution, still use `--root-dir` for consistency:

```bash
flyte run --root-dir . lib/workflows/workflow1.py process_workflow
flyte deploy --root-dir . lib/workflows/workflow1.py
```

### When to use

- Multiple related workflows in one project
- Shared task environments and utilities
- Team projects with multiple contributors
- Applications requiring organized code structure
- Projects that benefit from proper Python packaging

## Full build deployment

When you need complete reproducibility and want to embed all code directly in the container image, use the full build pattern. This disables Flyte's fast deployment system in favor of traditional container builds.

### Overview

By default, Flyte uses a fast deployment system that:
- Creates a tar archive of your files
- Skips the full image build and push process
- Provides faster iteration during development

However, sometimes you need to **completely embed your code into the container image** for:
- Full reproducibility with immutable container images
- Environments where fast deployment isn't available
- Production deployments with all dependencies baked in
- Air-gapped or restricted deployment environments

### Key configuration

```python
import pathlib

from dep import foo

import flyte

env = flyte.TaskEnvironment(
    name="full_build",
    image=flyte.Image.from_debian_base().with_source_folder(
        pathlib.Path(__file__).parent,
        copy_contents_only=True  # Avoid nested folders
    ),
)

@env.task
def square(x) -> int:
    return x ** foo()

@env.task
def main(n: int) -> list[int]:
    return list(flyte.map(square, range(n)))

if __name__ == "__main__":
    # copy_contents_only=True requires root_dir=parent, False requires root_dir=parent.parent
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    run = flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)
    print(run.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/full_build/main.py*

### Local dependency example

The main.py file imports from a local dependency that gets included in the build:

```python
def foo() -> int:
    return 1
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/full_build/dep.py*

### Critical configuration components

1. **Set `copy_style` to `"none"`**:
   ```python
   flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)
   ```
   This disables Flyte's fast deployment system and forces a full container build.

2. **Set a custom version**:
   ```python
   flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)
   ```
   The `version` parameter should be set to a desired value (not auto-generated) for consistent image tagging.

3. **Configure image source copying**:
   ```python
   image=flyte.Image.from_debian_base().with_source_folder(
       pathlib.Path(__file__).parent,
       copy_contents_only=True
   )
   ```
   Use `.with_source_folder()` to specify what code to copy into the container.

4. **Set `root_dir` correctly**:
   ```python
   flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
   ```
   - If `copy_contents_only=True`: Set `root_dir` to the source folder (contents are copied)
   - If `copy_contents_only=False`: Set `root_dir` to parent directory (folder is copied)

### Configuration options

#### Option A: Copy Folder Structure
```python
# Copies the entire folder structure into the container
image=flyte.Image.from_debian_base().with_source_folder(
    pathlib.Path(__file__).parent,
    copy_contents_only=False  # Default
)

# When copy_contents_only=False, set root_dir to parent.parent
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)
```

#### Option B: Copy Contents Only (Recommended)
```python
# Copies only the contents of the folder (flattens structure)
# This is useful when you want to avoid nested folders - for example all your code is in the root of the repo
image=flyte.Image.from_debian_base().with_source_folder(
    pathlib.Path(__file__).parent,
    copy_contents_only=True
)

# When copy_contents_only=True, set root_dir to parent
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
```

### Version management best practices

When using `copy_style="none"`, always specify an explicit version:
- Use semantic versioning: `"v1.0.0"`, `"v1.1.0"`
- Use build numbers: `"build-123"`
- Use git commits: `"abc123"`

Avoid auto-generated versions to ensure reproducible deployments.

### Performance considerations

- **Full builds take longer** than fast deployment
- **Container images will be larger** as they include all source code
- **Better for production** where immutability is important
- **Use during development** when testing the full deployment pipeline

### When to use

✅ **Use full build when:**
- Deploying to production environments
- Need immutable, reproducible container images
- Working with complex dependency structures
- Deploying to air-gapped or restricted environments
- Building CI/CD pipelines

❌ **Don't use full build when:**
- Rapid development and iteration
- Working with frequently changing code
- Development environments where speed matters
- Simple workflows without complex dependencies

### Troubleshooting

**Common issues:**
1. **Import errors**: Check your `root_dir` configuration matches `copy_contents_only`
2. **Missing files**: Ensure all dependencies are in the source folder
3. **Version conflicts**: Use explicit, unique version strings
4. **Build failures**: Check that the base image has all required system dependencies

**Debug tips:**
- Add print statements to verify file paths in containers
- Use `docker run -it <image> /bin/bash` to inspect built images
- Check Flyte logs for build errors and warnings
- Verify that relative imports work correctly in the container context

## Python path deployment

For projects where workflows are separated from business logic across multiple directories, use the Python path pattern with proper `root_dir` configuration.

### Example structure

```
pythonpath/
├── workflows/
│   └── workflow.py      # Flyte workflow definitions
├── src/
│   └── my_module.py     # Business logic modules
├── run.sh               # Execute from project root
└── run_inside_folder.sh # Execute from workflows/ directory
```

### Implementation

```python
import pathlib

from src.my_module import env, say_hello

import flyte

env = flyte.TaskEnvironment(
    name="workflow_env",
    depends_on=[env],
)

@env.task
async def greet(name: str) -> str:
    return await say_hello(name)

if __name__ == "__main__":
    current_dir = pathlib.Path(__file__).parent
    flyte.init_from_config(root_dir=current_dir.parent)
    r = flyte.run(greet, name="World")
    print(r.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pythonpath/workflows/workflow.py*

```python
import flyte

env = flyte.TaskEnvironment(
    name="my_module",
)

@env.task
async def say_hello(name: str) -> str:
    return f"Hello, {name}!"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/pythonpath/src/my_module.py*

### Task environment dependencies

Note how the workflow imports both the task environment and the task function:

```python
from src.my_module import env, say_hello

env = flyte.TaskEnvironment(
    name="workflow_env",
    depends_on=[env],  # Depends on the imported environment
)
```

This pattern allows sharing task environments across modules while maintaining proper dependency relationships.

### Key considerations

- **Import resolution**: `root_dir` enables proper module imports across directories
- **File packaging**: Flyte packages all files starting from `root_dir`
- **Execution flexibility**: Works regardless of where you execute the script
- **PYTHONPATH handling**: Different behavior for CLI vs direct Python execution

### CLI vs Direct Python execution

#### Using Flyte CLI with `--root-dir` (Recommended)

When using `flyte run` with `--root-dir`, you don't need to export PYTHONPATH:

```bash
flyte run --root-dir . workflows/workflow.py greet --name "World"
```

The CLI automatically:
- Adds the `--root-dir` location to `sys.path`
- Resolves all imports correctly
- Packages files from the root directory for remote execution

#### Using Python directly

When running Python scripts directly, you must set PYTHONPATH manually:

```bash
PYTHONPATH=.:$PYTHONPATH python workflows/workflow.py
```

This is because:
- Python doesn't automatically know about your project structure
- You need to explicitly tell Python where to find your modules
- The `root_dir` parameter handles remote packaging, not local path resolution

### Best practices

1. **Always set `root_dir`** when workflows import from multiple directories
2. **Use pathlib** for cross-platform path handling
3. **Set `root_dir` to your project root** to ensure all dependencies are captured
4. **Test both execution patterns** to ensure deployment works from any directory

### Common pitfalls

- **Forgetting `root_dir`**: Results in import errors during remote execution
- **Wrong `root_dir` path**: May package too many or too few files
- **Not setting PYTHONPATH when using Python directly**: Use `flyte run --root-dir .` instead
- **Mixing execution methods**: If you use `flyte run --root-dir .`, you don't need PYTHONPATH

### When to use

- Legacy projects with established directory structures
- Separation of concerns between workflows and business logic
- Multiple workflow definitions sharing common modules
- Projects with complex import hierarchies

**Note:** This pattern is an escape hatch for larger projects where code organization requires separating workflows from business logic. Ideally, structure projects with `pyproject.toml` for cleaner dependency management.

## Dynamic environment deployment

For environments that need to change based on deployment context (development vs production), use dynamic environment selection based on Flyte domains.

### Domain-based environment selection

Use `flyte.current_domain()` to deterministically create different task environments based on the deployment domain:

```python
# NOTE: flyte.init() invocation at the module level is strictly discouraged.
# At runtime, Flyte controls initialization and configuration files are not present.

import os

import flyte

def create_env():
    if flyte.current_domain() == "development":
        return flyte.TaskEnvironment(name="dev", image=flyte.Image.from_debian_base(), env_vars={"MY_ENV": "dev"})
    return flyte.TaskEnvironment(name="prod", image=flyte.Image.from_debian_base(), env_vars={"MY_ENV": "prod"})

env = create_env()

@env.task
async def my_task(n: int) -> int:
    print(f"Environment Variable MY_ENV = {os.environ['MY_ENV']}", flush=True)
    return n + 1

@env.task
async def entrypoint(n: int) -> int:
    print(f"Environment Variable MY_ENV = {os.environ['MY_ENV']}", flush=True)
    return await my_task(n)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dynamic_environments/environment_picker.py*

### Why this pattern works

**Environment reproducibility in local and remote clusters is critical.** Flyte re-instantiates modules in remote clusters, so `current_domain()` will be set correctly based on where the code executes.

✅ **Do use `flyte.current_domain()`** - Flyte automatically sets this based on the execution context

❌ **Don't use environment variables directly** - They won't yield correct results unless manually passed to the downstream system

### How it works

1. Flyte sets the domain context when initializing
2. `current_domain()` returns the domain string (e.g., "development", "staging", "production")
3. Your code deterministically configures resources based on this domain
4. When Flyte executes remotely, it re-instantiates modules with the correct domain context
5. The same environment configuration logic runs consistently everywhere

### Important constraints

`flyte.current_domain()` only works **after** `flyte.init()` is called:

- ✅ Works with `flyte run` and `flyte deploy` CLI commands (they init automatically)
- ✅ Works when called from `if __name__ == "__main__"` after explicit `flyte.init()`
- ❌ Does NOT work at module level without initialization

**Critical:** `flyte.init()` invocation at the module level is **strictly discouraged**. The reason is that at runtime, Flyte controls the initialization and configuration files are not present at runtime.

### Alternative: Environment variable approach

For cases where you need to pass domain information as environment variables to the container runtime, use this approach:

```python
import os

import flyte

def create_env(domain: str):
    # Pass domain as environment variable so tasks can see which domain they're running in
    if domain == "development":
        return flyte.TaskEnvironment(name="dev", image=flyte.Image.from_debian_base(), env_vars={"DOMAIN_NAME": domain})
    return flyte.TaskEnvironment(name="prod", image=flyte.Image.from_debian_base(), env_vars={"DOMAIN_NAME": domain})

env = create_env(os.getenv("DOMAIN_NAME", "development"))

@env.task
async def my_task(n: int) -> int:
    print(f"Environment Variable MY_ENV = {os.environ['DOMAIN_NAME']}", flush=True)
    return n + 1

@env.task
async def entrypoint(n: int) -> int:
    print(f"Environment Variable MY_ENV = {os.environ['DOMAIN_NAME']}", flush=True)
    return await my_task(n)

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(entrypoint, n=5)
    print(r.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dynamic_environments_with_envvars/environment_picker.py*

#### Key differences from domain-based approach

- **Environment variable access**: The domain name is available inside tasks via `os.environ['DOMAIN_NAME']`
- **External control**: Can be controlled via system environment variables before execution
- **Runtime visibility**: Tasks can inspect which environment they're running in during execution
- **Default fallback**: Uses `"development"` as default when `DOMAIN_NAME` is not set

#### Usage with environment variables

Set the environment variable and run:

```bash
export DOMAIN_NAME=production
flyte run environment_picker.py entrypoint --n 5
```

Or set it inline:

```bash
DOMAIN_NAME=development flyte run environment_picker.py entrypoint --n 5
```

#### When to use environment variables vs domain-based

**Use environment variables when:**
- Tasks need runtime access to environment information
- External systems set environment configuration
- You need flexibility to override environment externally
- Debugging requires visibility into environment selection

**Use domain-based approach when:**
- Environment selection should be automatic based on Flyte domain
- You want tighter integration with Flyte's domain system
- No need for runtime environment inspection within tasks

You can vary multiple aspects based on context:

- **Base images**: Different images for dev vs prod
- **Environment variables**: Configuration per environment
- **Resource requirements**: Different CPU/memory per domain
- **Dependencies**: Different package versions
- **Registry settings**: Different container registries

### Usage patterns

```bash
flyte run environment_picker.py entrypoint --n 5
flyte deploy environment_picker.py
```

For programmatic usage, ensure proper initialization:

```python
import flyte

flyte.init_from_config()
from environment_picker import entrypoint

if __name__ == "__main__":
    r = flyte.run(entrypoint, n=5)
    print(r.url)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/deployment-patterns/dynamic_environments/main.py*

### When to use dynamic environments

**General use cases:**
- Multi-environment deployments (dev/staging/prod)
- Different resource requirements per environment
- Environment-specific dependencies or settings
- Context-sensitive configuration needs

**Domain-based approach for:**
- Automatic environment selection tied to Flyte domains
- Simpler configuration without external environment variables
- Integration with Flyte's built-in domain system

**Environment variable approach for:**
- Runtime visibility into environment selection within tasks
- External control over environment configuration
- Debugging and logging environment-specific behavior
- Integration with external deployment systems that set environment variables

## Best practices

### Project organization

1. **Separate concerns**: Keep business logic separate from Flyte task definitions
2. **Use proper imports**: Structure projects for clean import patterns
3. **Version control**: Include all necessary files in version control
4. **Documentation**: Document deployment requirements and patterns

### Image management

1. **Registry configuration**: Use consistent registry settings across environments
2. **Image tagging**: Use meaningful tags for production deployments
3. **Base image selection**: Choose appropriate base images for your needs
4. **Dependency management**: Keep container images lightweight but complete

### Configuration management

1. **Root directory**: Set `root_dir` appropriately for your project structure
2. **Path handling**: Use `pathlib.Path` for cross-platform compatibility
3. **Environment variables**: Use environment-specific configurations
4. **Secrets management**: Handle sensitive data appropriately

### Development workflow

1. **Local testing**: Test tasks locally before deployment
2. **Incremental development**: Use `flyte run` for quick iterations
3. **Production deployment**: Use `flyte deploy` for permanent deployments
4. **Monitoring**: Monitor deployed tasks and environments

## Choosing the right pattern

| Pattern | Use Case | Complexity | Best For |
|---------|----------|------------|----------|
| Simple file | Quick prototypes, learning | Low | Single tasks, experiments |
| Custom Dockerfile | System dependencies, custom environments | Medium | Complex dependencies |
| PyProject package | Professional projects, async pipelines | Medium-High | Production applications |
| Package structure | Multiple workflows, shared utilities | Medium | Organized team projects |
| Full build | Production, reproducibility | High | Immutable deployments |
| Python path | Legacy structures, separated concerns | Medium | Existing codebases |
| Dynamic environment | Multi-environment, domain-aware deployments | Medium | Context-aware deployments |

Start with simpler patterns and evolve to more complex ones as your requirements grow. Many projects will combine multiple patterns as they scale and mature.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/task-deployment/run-context ===

# Run context

Every Flyte run has a **run context** — a set of invocation-time parameters that control where the run executes, where its outputs are stored, how caching behaves, and more.

There are two sides to run context:

- **Write side**: `flyte.with_runcontext()` — set run parameters before the run starts (programmatic) or via CLI flags.
- **Read side**: `flyte.ctx()` — access run parameters inside a running task.

## Configuring a run with `flyte.with_runcontext()`

`flyte.with_runcontext()` returns a runner object. Call `.run(task, ...)` on it to start the run with the specified context:

```
import flyte

env = flyte.TaskEnvironment("run-context-example")

@env.task
async def process(n: int) -> int:
    return n * 2

@env.task
async def root() -> int:
    return await process(21)

if __name__ == "__main__":
    flyte.init_from_config()
    flyte.with_runcontext(
        name="my-run",
        project="my-project",
        domain="development",
    ).run(root)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/run-context/run_context.py*

All parameters are optional. Unset parameters inherit from the configuration file (`config.yaml`) or system defaults.

### Execution target

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `mode` | `"local"` \| `"remote"` \| `"hybrid"` | *from config* | Where the run executes. `"remote"` runs on the Flyte backend; `"local"` runs in-process. |
| `project` | `str` | *from config* | Project to run in. |
| `domain` | `str` | *from config* | Domain to run in (e.g. `"development"`, `"production"`). |
| `name` | `str` | *auto-generated* | Custom name for the run, visible in the UI. |
| `version` | `str` | *from code bundle* | Version string for the ephemeral task deployment. |
| `queue` | `str` | *from config* | Cluster queue to schedule tasks on. |
| `interruptible` | `bool` | *per-task setting* | Override the interruptible setting for all tasks in the run. `True` allows spot/preemptible instances; `False` forces non-interruptible instances. |

### Storage

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `raw_data_path` | `str` | *from config* | Storage prefix for offloaded data types ([Files](https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories), [Dirs](https://www.union.ai/docs/v2/union/user-guide/task-programming/files-and-directories), [DataFrames](https://www.union.ai/docs/v2/union/user-guide/task-programming/dataframes), checkpoints). Accepts `s3://`, `gs://`, or local paths. |
| `run_base_dir` | `str` | *auto-generated* | Base directory for run metadata passed between tasks. Distinct from `raw_data_path`. |

To direct all task outputs to a specific bucket for a run:

```
if __name__ == "__main__":
    flyte.init_from_config()
    flyte.with_runcontext(
        # Store all task outputs in a dedicated S3 prefix for this run
        raw_data_path="s3://my-bucket/runs/experiment-42/",
    ).run(root)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/run-context/run_context.py*

The equivalent CLI flag is `--raw-data-path`. See [Run command options](./run-command-options#--raw-data-path) for CLI usage.

### Caching

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `overwrite_cache` | `bool` | `False` | Re-execute all tasks even if a cached result exists, and overwrite the cache with new results. |
| `disable_run_cache` | `bool` | `False` | Skip cache lookups and writes entirely for this run. |
| `cache_lookup_scope` | `"global"` \| ... | `"global"` | Scope for cache lookups. |

### Identity and resources

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `service_account` | `str` | *from config* | Kubernetes service account for task pods. |
| `env_vars` | `Dict[str, str]` | `None` | Additional environment variables to inject into task containers. |
| `labels` | `Dict[str, str]` | `None` | Kubernetes labels to apply to task pods. |
| `annotations` | `Dict[str, str]` | `None` | Kubernetes annotations to apply to task pods. |

### Logging

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `log_level` | `int` | *from config* | Python log level (e.g. `logging.DEBUG`). |
| `log_format` | `"console"` \| ... | `"console"` | Log output format. |
| `reset_root_logger` | `bool` | `False` | If `True`, preserve the root logger unchanged. |

### Code bundling

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `copy_style` | `"loaded_modules"` \| `"all"` \| `"none"` | `"loaded_modules"` | Code bundling strategy. See [Run command options](./run-command-options#--copy-style). |
| `dry_run` | `bool` | `False` | Build and upload the code bundle without executing the run. |
| `copy_bundle_to` | `Path` | `None` | When `dry_run=True`, copy the bundle to this local path. |
| `interactive_mode` | `bool` | *auto-detected* | Override interactive mode detection (set automatically for Jupyter notebooks). |
| `preserve_original_types` | `bool` | `False` | Keep native DataFrame types (e.g. `pd.DataFrame`) rather than converting to `flyte.io.DataFrame` when deserializing outputs. |

### Context propagation

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `custom_context` | `Dict[str, str]` | `None` | Metadata propagated through the entire task hierarchy. Readable inside any task via `flyte.ctx().custom_context`. See [Custom context](https://www.union.ai/docs/v2/union/user-guide/task-programming/custom-context). |

---

## Reading context inside a task with `flyte.ctx()`

Inside a running task, `flyte.ctx()` returns a `TaskContext` object with information about the current execution. Outside of a task, it returns `None`.

```
@env.task
async def inspect_context() -> str:
    ctx = flyte.ctx()
    action = ctx.action
    return (
        f"run={action.run_name}, "
        f"action={action.name}, "
        f"mode={ctx.mode}, "
        f"in_cluster={ctx.is_in_cluster()}"
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/run-context/run_context.py*

### `TaskContext` fields

| Field | Type | Description |
|-------|------|-------------|
| `action` | `ActionID` | Identity of this specific action (task invocation) within the run. |
| `mode` | `"local"` \| `"remote"` \| `"hybrid"` | Execution mode of the current run. |
| `version` | `str` | Version of the deployed task code bundle. |
| `raw_data_path` | `str` | Storage prefix where offloaded outputs are written. |
| `run_base_dir` | `str` | Base directory for run metadata. |
| `custom_context` | `Dict[str, str]` | Propagated context metadata from `with_runcontext()`. |
| `disable_run_cache` | `bool` | Whether run caching is disabled for this run. |
| `is_in_cluster()` | method | Returns `True` when `mode == "remote"`. Useful for branching local/remote behavior. |

### `ActionID` fields

The `ctx.action` object identifies this specific task invocation:

| Field | Type | Description |
|-------|------|-------------|
| `name` | `str` | Unique identifier for this action. |
| `run_name` | `str` | Name of the parent run (defaults to `name` if not set). |
| `project` | `str \| None` | Project the action runs in. |
| `domain` | `str \| None` | Domain the action runs in. |
| `org` | `str \| None` | Organization. |

### Naming external resources

`ctx.action.run_name` is useful for tying external tool runs (experiment trackers, dashboards) to the corresponding Flyte run:

```
import wandb  # type: ignore[import]

@env.task
async def train_model(epochs: int) -> float:
    ctx = flyte.ctx()
    # Use run_name to tie the W&B run to this Flyte run
    run = wandb.init(
        project="my-project",
        name=ctx.action.run_name,
        config={"epochs": epochs},
    )
    # ... training logic ...
    loss = 0.42
    run.log({"loss": loss})
    run.finish()
    return loss
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-deployment/run-context/run_context.py*

This ensures that when you look up a run in Weights & Biases (or any other tool), its name matches what you see in the Flyte UI.

