# Human-in-the-loop

Human-in-the-loop (HITL) workflows pause execution at a defined point, wait for a human to provide input or approval, and then continue based on that response. Common use cases include content moderation gates, model output review, anomaly confirmation, and manual approval steps before costly or irreversible operations.

The `flyteplugins-hitl` package provides an event-based API for this pattern. When an event is created, Flyte automatically serves a small FastAPI web app with a form where a human can submit input. The workflow then resumes with the submitted value.

```bash
pip install flyteplugins-hitl
```

Key characteristics:

- Supports `int`, `float`, `str`, and `bool` input types
- Crash-resilient: uses durable sleep so polling survives task restarts
- Configurable timeout and poll interval
- The web form is accessible from the task's report in the Flyte UI

## Setup

The task environment must declare `hitl.env` as a dependency. This makes the HITL web app available during task execution:

```
import flyte
import flyteplugins.hitl as hitl

# The task environment must declare hitl.env as a dependency.
# This makes the HITL web app available during task execution.
env = flyte.TaskEnvironment(
    name="hitl-workflow",
    image=flyte.Image.from_debian_base(name="hitl").with_pip_packages(
        "flyteplugins-hitl>=2.0.0",
        "fastapi",
        "uvicorn",
        "python-multipart",
    ),
    resources=flyte.Resources(cpu="1", memory="512Mi"),
    depends_on=[hitl.env],
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Automated task

An automated task runs first and produces a result that requires human review:

```
@env.task(report=True)
async def analyze_data(dataset: str) -> dict:
    """Automated task that produces a result requiring human review."""
    # Simulate analysis
    result = {
        "dataset": dataset,
        "row_count": 142857,
        "anomalies_detected": 3,
        "confidence": 0.87,
    }
    await flyte.report.replace.aio(
        f"Analysis complete: {result['anomalies_detected']} anomalies detected "
        f"(confidence: {result['confidence']:.0%})"
    )
    await flyte.report.flush.aio()
    return result
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Requesting human input

Use `hitl.new_event()` to pause and wait for a human response. The `prompt` is shown on the web form. The `data_type` controls what type the submitted value is converted to before being returned:

```
@env.task(report=True)
async def request_human_review(analysis: dict) -> bool:
    """Pause and ask a human whether to proceed with the flagged records."""
    event = await hitl.new_event.aio(
        "review_decision",
        data_type=bool,
        scope="run",
        prompt=(
            f"Analysis found {analysis['anomalies_detected']} anomalies "
            f"with {analysis['confidence']:.0%} confidence. "
            "Approve for downstream processing? (true/false)"
        ),
    )
    approved: bool = await event.wait.aio()
    return approved
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

When this task runs, Flyte:

1. Serves the HITL web app (if not already running)
2. Creates an event and writes a pending request to object storage
3. Displays a link to the web form in the task report
4. Polls for a response using durable sleep
5. Returns the submitted value once input is received

## Wiring it together

The main task orchestrates the automated step and the HITL gate:

```
@env.task(report=True)
async def main(dataset: str = "s3://my-bucket/data.parquet") -> str:
    analysis = await analyze_data(dataset=dataset)

    approved = await request_human_review(analysis=analysis)

    if approved:
        return "Processing approved — continuing pipeline."
    else:
        return "Processing rejected by reviewer — pipeline halted."

if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
    print(r.outputs())
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/task-programming/human-in-the-loop/hitl.py*

## Event options

`hitl.new_event()` accepts the following parameters:

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | `str` | — | Descriptive name shown in logs and the UI |
| `data_type` | `type` | — | Expected input type: `int`, `float`, `str`, or `bool` |
| `scope` | `str` | `"run"` | Scope of the event. Currently only `"run"` is supported |
| `prompt` | `str` | `"Please provide a value"` | Message shown on the web form |
| `timeout_seconds` | `int` | `3600` | Maximum time to wait before raising `TimeoutError` |
| `poll_interval_seconds` | `int` | `5` | How often to check for a response |

## Submitting input programmatically

In addition to the web form, input can be submitted via the event's JSON API endpoint. This is useful for automated testing or integration with external approval systems:

```bash
curl -X POST https://<hitl-app-endpoint>/submit/json \
  -H "Content-Type: application/json" \
  -d '{
    "request_id": "<request_id>",
    "response_path": "<response_path>",
    "value": "true",
    "data_type": "bool"
  }'
```

The `request_id` and `response_path` are shown in the task report alongside the form URL.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/task-programming/human-in-the-loop.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/task-programming/human-in-the-loop/
