# Multi-agent trading simulation

> [!NOTE]
> Code available [here](https://github.com/unionai/unionai-examples/tree/main/v2/tutorials/trading_agents); based on work by [TauricResearch](https://github.com/TauricResearch/TradingAgents).

This example walks you through building a multi-agent trading simulation, modeling how agents within a firm might interact, strategize, and make trades collaboratively.

![Trading agents execution visualization](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/tutorials/trading-agents/execution.png)
_Trading agents execution visualization_

## TL;DR

- You'll build a trading firm made up of agents that analyze, argue, and act, modeled with Python functions.
- You'll use the Flyte SDK to orchestrate this world — giving you visibility, retries, caching, and durability.
- You'll learn how to plug in tools, structure conversations, and track decisions across agents.
- You'll see how agents debate, use context, generate reports, and retain memory via vector DBs.

## What is an agent, anyway?

Agentic workflows are a rising pattern for complex problem-solving with LLMs. Think of agents as:

- An LLM (like GPT-4 or Mistral)
- A loop that keeps them thinking until a goal is met
- A set of optional tools they can call (APIs, search, calculators, etc.)
- Enough tokens to reason about the problem at hand

That's it.

You define tools, bind them to an agent, and let it run, reasoning step-by-step, optionally using those tools, until it finishes.

## What's different here?

We're not building yet another agent framework. You're free to use LangChain, custom code, or whatever setup you like.

What we're giving you is the missing piece: a way to run these workflows **reliably, observably, and at scale, with zero rewrites.**

With Flyte, you get:

- Prompt + tool traceability and full state retention
- Built-in retries, caching, and failure recovery
- A native way to plug in your agents; no magic syntax required

## How it works: step-by-step walkthrough

This simulation is powered by a Flyte task that orchestrates multiple intelligent agents working together to analyze a company's stock and make informed trading decisions.

![Trading agents schema](https://raw.githubusercontent.com/unionai/unionai-docs-static/main/images/tutorials/trading-agents/schema.png)
_Trading agents schema_

### Entry point

Everything begins with a top-level Flyte task called `main`, which serves as the entry point to the workflow.

```
# /// script
# requires-python = "==3.13"
# dependencies = [
#     "flyte>=2.0.0b52",
#     "akshare==1.16.98",
#     "backtrader==1.9.78.123",
#     "boto3==1.39.9",
#     "chainlit==2.5.5",
#     "eodhd==1.0.32",
#     "feedparser==6.0.11",
#     "finnhub-python==2.4.23",
#     "langchain-experimental==0.3.4",
#     "langchain-openai==0.3.23",
#     "pandas==2.3.0",
#     "parsel==1.10.0",
#     "praw==7.8.1",
#     "pytz==2025.2",
#     "questionary==2.1.0",
#     "redis==6.2.0",
#     "requests==2.32.4",
#     "stockstats==0.6.5",
#     "tqdm==4.67.1",
#     "tushare==1.4.21",
#     "typing-extensions==4.14.0",
#     "yfinance==0.2.63",
# ]
# main = "main"
# params = ""
# ///
import asyncio
from copy import deepcopy

import agents
import agents.analysts
from agents.managers import create_research_manager, create_risk_manager
from agents.researchers import create_bear_researcher, create_bull_researcher
from agents.risk_debators import (
    create_neutral_debator,
    create_risky_debator,
    create_safe_debator,
)
from agents.trader import create_trader
from agents.utils.utils import AgentState
from flyte_env import DEEP_THINKING_LLM, QUICK_THINKING_LLM, env, flyte
from langchain_openai import ChatOpenAI
from reflection import (
    reflect_bear_researcher,
    reflect_bull_researcher,
    reflect_research_manager,
    reflect_risk_manager,
    reflect_trader,
)

@env.task
async def process_signal(full_signal: str, QUICK_THINKING_LLM: str) -> str:
    """Process a full trading signal to extract the core decision."""

    messages = [
        {
            "role": "system",
            "content": """You are an efficient assistant designed to analyze paragraphs or
financial reports provided by a group of analysts.
Your task is to extract the investment decision: SELL, BUY, or HOLD.
Provide only the extracted decision (SELL, BUY, or HOLD) as your output,
without adding any additional text or information.""",
        },
        {"role": "human", "content": full_signal},
    ]

    return ChatOpenAI(model=QUICK_THINKING_LLM).invoke(messages).content

async def run_analyst(analyst_name, state, online_tools):
    # Create a copy of the state for isolation
    run_fn = getattr(agents.analysts, f"create_{analyst_name}_analyst")

    # Run the analyst's chain
    result_state = await run_fn(QUICK_THINKING_LLM, state, online_tools)

    # Determine the report key
    report_key = (
        "sentiment_report"
        if analyst_name == "social_media"
        else f"{analyst_name}_report"
    )
    report_value = getattr(result_state, report_key)

    return result_state.messages[1:], report_key, report_value

# {{docs-fragment main}}
@env.task
async def main(
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> tuple[str, AgentState]:
    if not selected_analysts:
        raise ValueError(
            "No analysts selected. Please select at least one analyst from market, fundamentals, news, or social_media."
        )

    state = AgentState(
        messages=[{"role": "human", "content": company_name}],
        company_of_interest=company_name,
        trade_date=str(trade_date),
    )

    # Run all analysts concurrently
    results = await asyncio.gather(
        *[
            run_analyst(analyst, deepcopy(state), online_tools)
            for analyst in selected_analysts
        ]
    )

    # Flatten and append all resulting messages into the shared state
    for messages, report_attr, report in results:
        state.messages.extend(messages)
        setattr(state, report_attr, report)

    # Bull/Bear debate loop
    state = await create_bull_researcher(QUICK_THINKING_LLM, state)  # Start with bull
    while state.investment_debate_state.count < 2 * max_debate_rounds:
        current = state.investment_debate_state.current_response
        if current.startswith("Bull"):
            state = await create_bear_researcher(QUICK_THINKING_LLM, state)
        else:
            state = await create_bull_researcher(QUICK_THINKING_LLM, state)

    state = await create_research_manager(DEEP_THINKING_LLM, state)
    state = await create_trader(QUICK_THINKING_LLM, state)

    # Risk debate loop
    state = await create_risky_debator(QUICK_THINKING_LLM, state)  # Start with risky
    while state.risk_debate_state.count < 3 * max_risk_discuss_rounds:
        speaker = state.risk_debate_state.latest_speaker
        if speaker == "Risky":
            state = await create_safe_debator(QUICK_THINKING_LLM, state)
        elif speaker == "Safe":
            state = await create_neutral_debator(QUICK_THINKING_LLM, state)
        else:
            state = await create_risky_debator(QUICK_THINKING_LLM, state)

    state = await create_risk_manager(DEEP_THINKING_LLM, state)
    decision = await process_signal(state.final_trade_decision, QUICK_THINKING_LLM)

    return decision, state

# {{/docs-fragment main}}

# {{docs-fragment reflect_on_decisions}}
@env.task
async def reflect_and_store(state: AgentState, returns: str) -> str:
    await asyncio.gather(
        reflect_bear_researcher(state, returns),
        reflect_bull_researcher(state, returns),
        reflect_trader(state, returns),
        reflect_risk_manager(state, returns),
        reflect_research_manager(state, returns),
    )

    return "Reflection completed."

# Run the reflection task after the main function
@env.task(cache="disable")
async def reflect_on_decisions(
    returns: str,
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> str:
    _, state = await main(
        selected_analysts,
        max_debate_rounds,
        max_risk_discuss_rounds,
        online_tools,
        company_name,
        trade_date,
    )

    return await reflect_and_store(state, returns)

# {{/docs-fragment reflect_on_decisions}}

# {{docs-fragment execute_main}}
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main)
    print(run.url)
    run.wait()

    # run = flyte.run(reflect_on_decisions, "+3.2% gain over 5 days")
    # print(run.url)

# {{/docs-fragment execute_main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/main.py*

This task accepts several inputs:

- the list of analysts to run,
- the number of debate and risk discussion rounds,
- a flag to enable online tools,
- the company you're evaluating,
- and the target trading date.

The most interesting parameter here is the list of analysts to run. It determines which analyst agents will be invoked and shapes the overall structure of the simulation. Based on this input, the task dynamically launches agent tasks, running them in parallel.

The `main` task is written as a regular asynchronous Python function wrapped with Flyte's task decorator. No domain-specific language or orchestration glue is needed — just idiomatic Python, optionally using async for better performance. The task environment is configured once and shared across all tasks for consistency.

```
# {{docs-fragment env}}
import flyte

QUICK_THINKING_LLM = "gpt-4o-mini"
DEEP_THINKING_LLM = "o4-mini"

env = flyte.TaskEnvironment(
    name="trading-agents",
    secrets=[
        flyte.Secret(key="finnhub_api_key", as_env_var="FINNHUB_API_KEY"),
        flyte.Secret(key="openai_api_key", as_env_var="OPENAI_API_KEY"),
    ],
    image=flyte.Image.from_uv_script("main.py", name="trading-agents", pre=True),
    resources=flyte.Resources(cpu="1"),
    cache="auto",
)

# {{/docs-fragment env}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/flyte_env.py*

### Analyst agents

Each analyst agent comes equipped with a set of tools and a carefully designed prompt tailored to its specific domain. These tools are modular Flyte tasks — for example, downloading financial reports or computing technical indicators — and benefit from Flyte's built-in caching to avoid redundant computation.

```
from datetime import datetime

import pandas as pd
import tools.interface as interface
import yfinance as yf
from flyte_env import env

from flyte.io import File

@env.task
async def get_reddit_news(
    curr_date: str,  # Date you want to get news for in yyyy-mm-dd format
) -> str:
    """
    Retrieve global news from Reddit within a specified time frame.
    Args:
        curr_date (str): Date you want to get news for in yyyy-mm-dd format
    Returns:
        str: A formatted dataframe containing the latest global news
        from Reddit in the specified time frame.
    """

    global_news_result = interface.get_reddit_global_news(curr_date, 7, 5)

    return global_news_result

@env.task
async def get_finnhub_news(
    ticker: str,  # Search query of a company, e.g. 'AAPL, TSM, etc.
    start_date: str,  # Start date in yyyy-mm-dd format
    end_date: str,  # End date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the latest news about a given stock from Finnhub within a date range
    Args:
        ticker (str): Ticker of a company. e.g. AAPL, TSM
        start_date (str): Start date in yyyy-mm-dd format
        end_date (str): End date in yyyy-mm-dd format
    Returns:
        str: A formatted dataframe containing news about the company
        within the date range from start_date to end_date
    """

    end_date_str = end_date

    end_date = datetime.strptime(end_date, "%Y-%m-%d")
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    look_back_days = (end_date - start_date).days

    finnhub_news_result = interface.get_finnhub_news(
        ticker, end_date_str, look_back_days
    )

    return finnhub_news_result

@env.task
async def get_reddit_stock_info(
    ticker: str,  # Ticker of a company. e.g. AAPL, TSM
    curr_date: str,  # Current date you want to get news for
) -> str:
    """
    Retrieve the latest news about a given stock from Reddit, given the current date.
    Args:
        ticker (str): Ticker of a company. e.g. AAPL, TSM
        curr_date (str): current date in yyyy-mm-dd format to get news for
    Returns:
        str: A formatted dataframe containing the latest news about the company on the given date
    """

    stock_news_results = interface.get_reddit_company_news(ticker, curr_date, 7, 5)

    return stock_news_results

@env.task
async def get_YFin_data(
    symbol: str,  # ticker symbol of the company
    start_date: str,  # Start date in yyyy-mm-dd format
    end_date: str,  # End date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
    Args:
        symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
        start_date (str): Start date in yyyy-mm-dd format
        end_date (str): End date in yyyy-mm-dd format
    Returns:
        str: A formatted dataframe containing the stock price data
        for the specified ticker symbol in the specified date range.
    """

    result_data = interface.get_YFin_data(symbol, start_date, end_date)

    return result_data

@env.task
async def get_YFin_data_online(
    symbol: str,  # ticker symbol of the company
    start_date: str,  # Start date in yyyy-mm-dd format
    end_date: str,  # End date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the stock price data for a given ticker symbol from Yahoo Finance.
    Args:
        symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
        start_date (str): Start date in yyyy-mm-dd format
        end_date (str): End date in yyyy-mm-dd format
    Returns:
        str: A formatted dataframe containing the stock price data
        for the specified ticker symbol in the specified date range.
    """

    result_data = interface.get_YFin_data_online(symbol, start_date, end_date)

    return result_data

@env.task
async def cache_market_data(symbol: str, start_date: str, end_date: str) -> File:
    data_file = f"{symbol}-YFin-data-{start_date}-{end_date}.csv"

    data = yf.download(
        symbol,
        start=start_date,
        end=end_date,
        multi_level_index=False,
        progress=False,
        auto_adjust=True,
    )
    data = data.reset_index()
    data.to_csv(data_file, index=False)

    return await File.from_local(data_file)

@env.task
async def get_stockstats_indicators_report(
    symbol: str,  # ticker symbol of the company
    indicator: str,  # technical indicator to get the analysis and report of
    curr_date: str,  # The current trading date you are trading on, YYYY-mm-dd
    look_back_days: int = 30,  # how many days to look back
) -> str:
    """
    Retrieve stock stats indicators for a given ticker symbol and indicator.
    Args:
        symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
        indicator (str): Technical indicator to get the analysis and report of
        curr_date (str): The current trading date you are trading on, YYYY-mm-dd
        look_back_days (int): How many days to look back, default is 30
    Returns:
        str: A formatted dataframe containing the stock stats indicators
        for the specified ticker symbol and indicator.
    """

    today_date = pd.Timestamp.today()

    end_date = today_date
    start_date = today_date - pd.DateOffset(years=15)
    start_date = start_date.strftime("%Y-%m-%d")
    end_date = end_date.strftime("%Y-%m-%d")

    data_file = await cache_market_data(symbol, start_date, end_date)
    local_data_file = await data_file.download()

    result_stockstats = interface.get_stock_stats_indicators_window(
        symbol, indicator, curr_date, look_back_days, False, local_data_file
    )

    return result_stockstats

# {{docs-fragment get_stockstats_indicators_report_online}}
@env.task
async def get_stockstats_indicators_report_online(
    symbol: str,  # ticker symbol of the company
    indicator: str,  # technical indicator to get the analysis and report of
    curr_date: str,  # The current trading date you are trading on, YYYY-mm-dd"
    look_back_days: int = 30,  # "how many days to look back"
) -> str:
    """
    Retrieve stock stats indicators for a given ticker symbol and indicator.
    Args:
        symbol (str): Ticker symbol of the company, e.g. AAPL, TSM
        indicator (str): Technical indicator to get the analysis and report of
        curr_date (str): The current trading date you are trading on, YYYY-mm-dd
        look_back_days (int): How many days to look back, default is 30
    Returns:
        str: A formatted dataframe containing the stock stats indicators
        for the specified ticker symbol and indicator.
    """

    today_date = pd.Timestamp.today()

    end_date = today_date
    start_date = today_date - pd.DateOffset(years=15)
    start_date = start_date.strftime("%Y-%m-%d")
    end_date = end_date.strftime("%Y-%m-%d")

    data_file = await cache_market_data(symbol, start_date, end_date)
    local_data_file = await data_file.download()

    result_stockstats = interface.get_stock_stats_indicators_window(
        symbol, indicator, curr_date, look_back_days, True, local_data_file
    )

    return result_stockstats

# {{/docs-fragment get_stockstats_indicators_report_online}}

@env.task
async def get_finnhub_company_insider_sentiment(
    ticker: str,  # ticker symbol for the company
    curr_date: str,  # current date of you are trading at, yyyy-mm-dd
) -> str:
    """
    Retrieve insider sentiment information about a company (retrieved
    from public SEC information) for the past 30 days
    Args:
        ticker (str): ticker symbol of the company
        curr_date (str): current date you are trading at, yyyy-mm-dd
    Returns:
        str: a report of the sentiment in the past 30 days starting at curr_date
    """

    data_sentiment = interface.get_finnhub_company_insider_sentiment(
        ticker, curr_date, 30
    )

    return data_sentiment

@env.task
async def get_finnhub_company_insider_transactions(
    ticker: str,  # ticker symbol
    curr_date: str,  # current date you are trading at, yyyy-mm-dd
) -> str:
    """
    Retrieve insider transaction information about a company
    (retrieved from public SEC information) for the past 30 days
    Args:
        ticker (str): ticker symbol of the company
        curr_date (str): current date you are trading at, yyyy-mm-dd
    Returns:
        str: a report of the company's insider transactions/trading information in the past 30 days
    """

    data_trans = interface.get_finnhub_company_insider_transactions(
        ticker, curr_date, 30
    )

    return data_trans

@env.task
async def get_simfin_balance_sheet(
    ticker: str,  # ticker symbol
    freq: str,  # reporting frequency of the company's financial history: annual/quarterly
    curr_date: str,  # current date you are trading at, yyyy-mm-dd
):
    """
    Retrieve the most recent balance sheet of a company
    Args:
        ticker (str): ticker symbol of the company
        freq (str): reporting frequency of the company's financial history: annual / quarterly
        curr_date (str): current date you are trading at, yyyy-mm-dd
    Returns:
        str: a report of the company's most recent balance sheet
    """

    data_balance_sheet = interface.get_simfin_balance_sheet(ticker, freq, curr_date)

    return data_balance_sheet

@env.task
async def get_simfin_cashflow(
    ticker: str,  # ticker symbol
    freq: str,  # reporting frequency of the company's financial history: annual/quarterly
    curr_date: str,  # current date you are trading at, yyyy-mm-dd
) -> str:
    """
    Retrieve the most recent cash flow statement of a company
    Args:
        ticker (str): ticker symbol of the company
        freq (str): reporting frequency of the company's financial history: annual / quarterly
        curr_date (str): current date you are trading at, yyyy-mm-dd
    Returns:
            str: a report of the company's most recent cash flow statement
    """

    data_cashflow = interface.get_simfin_cashflow(ticker, freq, curr_date)

    return data_cashflow

@env.task
async def get_simfin_income_stmt(
    ticker: str,  # ticker symbol
    freq: str,  # reporting frequency of the company's financial history: annual/quarterly
    curr_date: str,  # current date you are trading at, yyyy-mm-dd
) -> str:
    """
    Retrieve the most recent income statement of a company
    Args:
        ticker (str): ticker symbol of the company
        freq (str): reporting frequency of the company's financial history: annual / quarterly
        curr_date (str): current date you are trading at, yyyy-mm-dd
    Returns:
            str: a report of the company's most recent income statement
    """

    data_income_stmt = interface.get_simfin_income_statements(ticker, freq, curr_date)

    return data_income_stmt

@env.task
async def get_google_news(
    query: str,  # Query to search with
    curr_date: str,  # Curr date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the latest news from Google News based on a query and date range.
    Args:
        query (str): Query to search with
        curr_date (str): Current date in yyyy-mm-dd format
        look_back_days (int): How many days to look back
    Returns:
        str: A formatted string containing the latest news from Google News
        based on the query and date range.
    """

    google_news_results = interface.get_google_news(query, curr_date, 7)

    return google_news_results

@env.task
async def get_stock_news_openai(
    ticker: str,  # the company's ticker
    curr_date: str,  # Current date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the latest news about a given stock by using OpenAI's news API.
    Args:
        ticker (str): Ticker of a company. e.g. AAPL, TSM
        curr_date (str): Current date in yyyy-mm-dd format
    Returns:
        str: A formatted string containing the latest news about the company on the given date.
    """

    openai_news_results = interface.get_stock_news_openai(ticker, curr_date)

    return openai_news_results

@env.task
async def get_global_news_openai(
    curr_date: str,  # Current date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the latest macroeconomics news on a given date using OpenAI's macroeconomics news API.
    Args:
        curr_date (str): Current date in yyyy-mm-dd format
    Returns:
        str: A formatted string containing the latest macroeconomic news on the given date.
    """

    openai_news_results = interface.get_global_news_openai(curr_date)

    return openai_news_results

@env.task
async def get_fundamentals_openai(
    ticker: str,  # the company's ticker
    curr_date: str,  # Current date in yyyy-mm-dd format
) -> str:
    """
    Retrieve the latest fundamental information about a given stock
    on a given date by using OpenAI's news API.
    Args:
        ticker (str): Ticker of a company. e.g. AAPL, TSM
        curr_date (str): Current date in yyyy-mm-dd format

    Returns:
        str: A formatted string containing the latest fundamental information
        about the company on the given date.
    """

    openai_fundamentals_results = interface.get_fundamentals_openai(ticker, curr_date)

    return openai_fundamentals_results
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/tools/toolkit.py*

When initialized, an analyst enters a structured reasoning loop (via LangChain), where it can call tools, observe outputs, and refine its internal state before generating a final report. These reports are later consumed by downstream agents.

Here's an example of a news analyst that interprets global events and macroeconomic signals. We specify the tools accessible to the analyst, and the LLM selects which ones to use based on context.

```
import asyncio

from agents.utils.utils import AgentState
from flyte_env import env
from langchain_core.messages import ToolMessage, convert_to_openai_messages
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from tools import toolkit

import flyte

MAX_ITERATIONS = 5

# {{docs-fragment agent_helper}}
async def run_chain_with_tools(
    type: str, state: AgentState, llm: str, system_message: str, tool_names: list[str]
) -> AgentState:
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, collaborating with other assistants."
                " Use the provided tools to progress towards answering the question."
                " If you are unable to fully answer, that's OK; another assistant with different tools"
                " will help where you left off. Execute what you can to make progress."
                " If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
                " prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
                " You have access to the following tools: {tool_names}.\n{system_message}"
                " For your reference, the current date is {current_date}. The company we want to look at is {ticker}.",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )

    prompt = prompt.partial(system_message=system_message)
    prompt = prompt.partial(tool_names=", ".join(tool_names))
    prompt = prompt.partial(current_date=state.trade_date)
    prompt = prompt.partial(ticker=state.company_of_interest)

    chain = prompt | ChatOpenAI(model=llm).bind_tools(
        [getattr(toolkit, tool_name).func for tool_name in tool_names]
    )

    iteration = 0
    while iteration < MAX_ITERATIONS:
        result = await chain.ainvoke(state.messages)
        state.messages.append(convert_to_openai_messages(result))

        if not result.tool_calls:
            # Final response — no tools required
            setattr(state, f"{type}_report", result.content or "")
            break

        # Run all tool calls in parallel
        async def run_single_tool(tool_call):
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]
            tool = getattr(toolkit, tool_name, None)

            if not tool:
                return None

            content = await tool(**tool_args)
            return ToolMessage(
                tool_call_id=tool_call["id"], name=tool_name, content=content
            )

        with flyte.group(f"tool_calls_iteration_{iteration}"):
            tool_messages = await asyncio.gather(
                *[run_single_tool(tc) for tc in result.tool_calls]
            )

        # Add valid tool results to state
        tool_messages = [msg for msg in tool_messages if msg]
        state.messages.extend(convert_to_openai_messages(tool_messages))

        iteration += 1
    else:
        # Reached iteration cap — optionally raise or log
        print(f"Max iterations ({MAX_ITERATIONS}) reached for {type}")

    return state

# {{/docs-fragment agent_helper}}

@env.task
async def create_fundamentals_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [toolkit.get_fundamentals_openai]
    else:
        tools = [
            toolkit.get_finnhub_company_insider_sentiment,
            toolkit.get_finnhub_company_insider_transactions,
            toolkit.get_simfin_balance_sheet,
            toolkit.get_simfin_cashflow,
            toolkit.get_simfin_income_stmt,
        ]

    system_message = (
        "You are a researcher tasked with analyzing fundamental information over the past week about a company. "
        "Please write a comprehensive report of the company's fundamental information such as financial documents, "
        "company profile, basic company financials, company financial history, insider sentiment, and insider "
        "transactions to gain a full view of the company's "
        "fundamental information to inform traders. Make sure to include as much detail as possible. "
        "Do not simply state the trends are mixed, "
        "provide detailed and finegrained analysis and insights that may help traders make decisions. "
        "Make sure to append a Markdown table at the end of the report to organize key points in the report, "
        "organized and easy to read."
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools(
        "fundamentals", state, llm, system_message, tool_names
    )

@env.task
async def create_market_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [
            toolkit.get_YFin_data_online,
            toolkit.get_stockstats_indicators_report_online,
        ]
    else:
        tools = [
            toolkit.get_YFin_data,
            toolkit.get_stockstats_indicators_report,
        ]

    system_message = (
        """You are a trading assistant tasked with analyzing financial markets.
Your role is to select the **most relevant indicators** for a given market condition
or trading strategy from the following list.
The goal is to choose up to **8 indicators** that provide complementary insights without redundancy.
Categories and each category's indicators are:

Moving Averages:
- close_50_sma: 50 SMA: A medium-term trend indicator.
Usage: Identify trend direction and serve as dynamic support/resistance.
Tips: It lags price; combine with faster indicators for timely signals.
- close_200_sma: 200 SMA: A long-term trend benchmark.
Usage: Confirm overall market trend and identify golden/death cross setups.
Tips: It reacts slowly; best for strategic trend confirmation rather than frequent trading entries.
- close_10_ema: 10 EMA: A responsive short-term average.
Usage: Capture quick shifts in momentum and potential entry points.
Tips: Prone to noise in choppy markets; use alongside longer averages for filtering false signals.

MACD Related:
- macd: MACD: Computes momentum via differences of EMAs.
Usage: Look for crossovers and divergence as signals of trend changes.
Tips: Confirm with other indicators in low-volatility or sideways markets.
- macds: MACD Signal: An EMA smoothing of the MACD line.
Usage: Use crossovers with the MACD line to trigger trades.
Tips: Should be part of a broader strategy to avoid false positives.
- macdh: MACD Histogram: Shows the gap between the MACD line and its signal.
Usage: Visualize momentum strength and spot divergence early.
Tips: Can be volatile; complement with additional filters in fast-moving markets.

Momentum Indicators:
- rsi: RSI: Measures momentum to flag overbought/oversold conditions.
Usage: Apply 70/30 thresholds and watch for divergence to signal reversals.
Tips: In strong trends, RSI may remain extreme; always cross-check with trend analysis.

Volatility Indicators:
- boll: Bollinger Middle: A 20 SMA serving as the basis for Bollinger Bands.
Usage: Acts as a dynamic benchmark for price movement.
Tips: Combine with the upper and lower bands to effectively spot breakouts or reversals.
- boll_ub: Bollinger Upper Band: Typically 2 standard deviations above the middle line.
Usage: Signals potential overbought conditions and breakout zones.
Tips: Confirm signals with other tools; prices may ride the band in strong trends.
- boll_lb: Bollinger Lower Band: Typically 2 standard deviations below the middle line.
Usage: Indicates potential oversold conditions.
Tips: Use additional analysis to avoid false reversal signals.
- atr: ATR: Averages true range to measure volatility.
Usage: Set stop-loss levels and adjust position sizes based on current market volatility.
Tips: It's a reactive measure, so use it as part of a broader risk management strategy.

Volume-Based Indicators:
- vwma: VWMA: A moving average weighted by volume.
Usage: Confirm trends by integrating price action with volume data.
Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.

- Select indicators that provide diverse and complementary information.
Avoid redundancy (e.g., do not select both rsi and stochrsi).
Also briefly explain why they are suitable for the given market context.
When you tool call, please use the exact name of the indicators provided above as they are defined parameters,
otherwise your call will fail.
Please make sure to call get_YFin_data first to retrieve the CSV that is needed to generate indicators.
Write a very detailed and nuanced report of the trends you observe.
Do not simply state the trends are mixed, provide detailed and finegrained analysis
and insights that may help traders make decisions."""
        """ Make sure to append a Markdown table at the end of the report to
        organize key points in the report, organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]
    return await run_chain_with_tools("market", state, llm, system_message, tool_names)

# {{docs-fragment news_analyst}}
@env.task
async def create_news_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [
            toolkit.get_global_news_openai,
            toolkit.get_google_news,
        ]
    else:
        tools = [
            toolkit.get_finnhub_news,
            toolkit.get_reddit_news,
            toolkit.get_google_news,
        ]

    system_message = (
        "You are a news researcher tasked with analyzing recent news and trends over the past week. "
        "Please write a comprehensive report of the current state of the world that is relevant for "
        "trading and macroeconomics. "
        "Look at news from EODHD, and finnhub to be comprehensive. Do not simply state the trends are mixed, "
        "provide detailed and finegrained analysis and insights that may help traders make decisions."
        """ Make sure to append a Markdown table at the end of the report to organize key points in the report,
        organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools("news", state, llm, system_message, tool_names)

# {{/docs-fragment news_analyst}}

@env.task
async def create_social_media_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [toolkit.get_stock_news_openai]
    else:
        tools = [toolkit.get_reddit_stock_info]

    system_message = (
        "You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, "
        "recent company news, and public sentiment for a specific company over the past week. "
        "You will be given a company's name your objective is to write a comprehensive long report "
        "detailing your analysis, insights, and implications for traders and investors on this company's current state "
        "after looking at social media and what people are saying about that company, "
        "analyzing sentiment data of what people feel each day about the company, and looking at recent company news. "
        "Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends "
        "are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
        """ Make sure to append a Makrdown table at the end of the report to organize key points in the report,
        organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools(
        "sentiment", state, llm, system_message, tool_names
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/analysts.py*

Each analyst agent uses a helper function to bind tools, iterate through reasoning steps (up to a configurable maximum), and produce an answer. Setting a max iteration count is crucial to prevent runaway loops. As agents reason, their message history is preserved in their internal state and passed along to the next agent in the chain.

```
import asyncio

from agents.utils.utils import AgentState
from flyte_env import env
from langchain_core.messages import ToolMessage, convert_to_openai_messages
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from tools import toolkit

import flyte

MAX_ITERATIONS = 5

# {{docs-fragment agent_helper}}
async def run_chain_with_tools(
    type: str, state: AgentState, llm: str, system_message: str, tool_names: list[str]
) -> AgentState:
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, collaborating with other assistants."
                " Use the provided tools to progress towards answering the question."
                " If you are unable to fully answer, that's OK; another assistant with different tools"
                " will help where you left off. Execute what you can to make progress."
                " If you or any other assistant has the FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** or deliverable,"
                " prefix your response with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL** so the team knows to stop."
                " You have access to the following tools: {tool_names}.\n{system_message}"
                " For your reference, the current date is {current_date}. The company we want to look at is {ticker}.",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )

    prompt = prompt.partial(system_message=system_message)
    prompt = prompt.partial(tool_names=", ".join(tool_names))
    prompt = prompt.partial(current_date=state.trade_date)
    prompt = prompt.partial(ticker=state.company_of_interest)

    chain = prompt | ChatOpenAI(model=llm).bind_tools(
        [getattr(toolkit, tool_name).func for tool_name in tool_names]
    )

    iteration = 0
    while iteration < MAX_ITERATIONS:
        result = await chain.ainvoke(state.messages)
        state.messages.append(convert_to_openai_messages(result))

        if not result.tool_calls:
            # Final response — no tools required
            setattr(state, f"{type}_report", result.content or "")
            break

        # Run all tool calls in parallel
        async def run_single_tool(tool_call):
            tool_name = tool_call["name"]
            tool_args = tool_call["args"]
            tool = getattr(toolkit, tool_name, None)

            if not tool:
                return None

            content = await tool(**tool_args)
            return ToolMessage(
                tool_call_id=tool_call["id"], name=tool_name, content=content
            )

        with flyte.group(f"tool_calls_iteration_{iteration}"):
            tool_messages = await asyncio.gather(
                *[run_single_tool(tc) for tc in result.tool_calls]
            )

        # Add valid tool results to state
        tool_messages = [msg for msg in tool_messages if msg]
        state.messages.extend(convert_to_openai_messages(tool_messages))

        iteration += 1
    else:
        # Reached iteration cap — optionally raise or log
        print(f"Max iterations ({MAX_ITERATIONS}) reached for {type}")

    return state

# {{/docs-fragment agent_helper}}

@env.task
async def create_fundamentals_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [toolkit.get_fundamentals_openai]
    else:
        tools = [
            toolkit.get_finnhub_company_insider_sentiment,
            toolkit.get_finnhub_company_insider_transactions,
            toolkit.get_simfin_balance_sheet,
            toolkit.get_simfin_cashflow,
            toolkit.get_simfin_income_stmt,
        ]

    system_message = (
        "You are a researcher tasked with analyzing fundamental information over the past week about a company. "
        "Please write a comprehensive report of the company's fundamental information such as financial documents, "
        "company profile, basic company financials, company financial history, insider sentiment, and insider "
        "transactions to gain a full view of the company's "
        "fundamental information to inform traders. Make sure to include as much detail as possible. "
        "Do not simply state the trends are mixed, "
        "provide detailed and finegrained analysis and insights that may help traders make decisions. "
        "Make sure to append a Markdown table at the end of the report to organize key points in the report, "
        "organized and easy to read."
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools(
        "fundamentals", state, llm, system_message, tool_names
    )

@env.task
async def create_market_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [
            toolkit.get_YFin_data_online,
            toolkit.get_stockstats_indicators_report_online,
        ]
    else:
        tools = [
            toolkit.get_YFin_data,
            toolkit.get_stockstats_indicators_report,
        ]

    system_message = (
        """You are a trading assistant tasked with analyzing financial markets.
Your role is to select the **most relevant indicators** for a given market condition
or trading strategy from the following list.
The goal is to choose up to **8 indicators** that provide complementary insights without redundancy.
Categories and each category's indicators are:

Moving Averages:
- close_50_sma: 50 SMA: A medium-term trend indicator.
Usage: Identify trend direction and serve as dynamic support/resistance.
Tips: It lags price; combine with faster indicators for timely signals.
- close_200_sma: 200 SMA: A long-term trend benchmark.
Usage: Confirm overall market trend and identify golden/death cross setups.
Tips: It reacts slowly; best for strategic trend confirmation rather than frequent trading entries.
- close_10_ema: 10 EMA: A responsive short-term average.
Usage: Capture quick shifts in momentum and potential entry points.
Tips: Prone to noise in choppy markets; use alongside longer averages for filtering false signals.

MACD Related:
- macd: MACD: Computes momentum via differences of EMAs.
Usage: Look for crossovers and divergence as signals of trend changes.
Tips: Confirm with other indicators in low-volatility or sideways markets.
- macds: MACD Signal: An EMA smoothing of the MACD line.
Usage: Use crossovers with the MACD line to trigger trades.
Tips: Should be part of a broader strategy to avoid false positives.
- macdh: MACD Histogram: Shows the gap between the MACD line and its signal.
Usage: Visualize momentum strength and spot divergence early.
Tips: Can be volatile; complement with additional filters in fast-moving markets.

Momentum Indicators:
- rsi: RSI: Measures momentum to flag overbought/oversold conditions.
Usage: Apply 70/30 thresholds and watch for divergence to signal reversals.
Tips: In strong trends, RSI may remain extreme; always cross-check with trend analysis.

Volatility Indicators:
- boll: Bollinger Middle: A 20 SMA serving as the basis for Bollinger Bands.
Usage: Acts as a dynamic benchmark for price movement.
Tips: Combine with the upper and lower bands to effectively spot breakouts or reversals.
- boll_ub: Bollinger Upper Band: Typically 2 standard deviations above the middle line.
Usage: Signals potential overbought conditions and breakout zones.
Tips: Confirm signals with other tools; prices may ride the band in strong trends.
- boll_lb: Bollinger Lower Band: Typically 2 standard deviations below the middle line.
Usage: Indicates potential oversold conditions.
Tips: Use additional analysis to avoid false reversal signals.
- atr: ATR: Averages true range to measure volatility.
Usage: Set stop-loss levels and adjust position sizes based on current market volatility.
Tips: It's a reactive measure, so use it as part of a broader risk management strategy.

Volume-Based Indicators:
- vwma: VWMA: A moving average weighted by volume.
Usage: Confirm trends by integrating price action with volume data.
Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.

- Select indicators that provide diverse and complementary information.
Avoid redundancy (e.g., do not select both rsi and stochrsi).
Also briefly explain why they are suitable for the given market context.
When you tool call, please use the exact name of the indicators provided above as they are defined parameters,
otherwise your call will fail.
Please make sure to call get_YFin_data first to retrieve the CSV that is needed to generate indicators.
Write a very detailed and nuanced report of the trends you observe.
Do not simply state the trends are mixed, provide detailed and finegrained analysis
and insights that may help traders make decisions."""
        """ Make sure to append a Markdown table at the end of the report to
        organize key points in the report, organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]
    return await run_chain_with_tools("market", state, llm, system_message, tool_names)

# {{docs-fragment news_analyst}}
@env.task
async def create_news_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [
            toolkit.get_global_news_openai,
            toolkit.get_google_news,
        ]
    else:
        tools = [
            toolkit.get_finnhub_news,
            toolkit.get_reddit_news,
            toolkit.get_google_news,
        ]

    system_message = (
        "You are a news researcher tasked with analyzing recent news and trends over the past week. "
        "Please write a comprehensive report of the current state of the world that is relevant for "
        "trading and macroeconomics. "
        "Look at news from EODHD, and finnhub to be comprehensive. Do not simply state the trends are mixed, "
        "provide detailed and finegrained analysis and insights that may help traders make decisions."
        """ Make sure to append a Markdown table at the end of the report to organize key points in the report,
        organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools("news", state, llm, system_message, tool_names)

# {{/docs-fragment news_analyst}}

@env.task
async def create_social_media_analyst(
    llm: str, state: AgentState, online_tools: bool
) -> AgentState:
    if online_tools:
        tools = [toolkit.get_stock_news_openai]
    else:
        tools = [toolkit.get_reddit_stock_info]

    system_message = (
        "You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, "
        "recent company news, and public sentiment for a specific company over the past week. "
        "You will be given a company's name your objective is to write a comprehensive long report "
        "detailing your analysis, insights, and implications for traders and investors on this company's current state "
        "after looking at social media and what people are saying about that company, "
        "analyzing sentiment data of what people feel each day about the company, and looking at recent company news. "
        "Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends "
        "are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
        """ Make sure to append a Makrdown table at the end of the report to organize key points in the report,
        organized and easy to read."""
    )

    tool_names = [tool.func.__name__ for tool in tools]

    return await run_chain_with_tools(
        "sentiment", state, llm, system_message, tool_names
    )
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/analysts.py*

Once all analyst reports are complete, their outputs are collected and passed to the next stage of the workflow.

### Research agents

The research phase consists of two agents: a bullish researcher and a bearish one. They evaluate the company from opposing viewpoints, drawing on the analysts' reports. Unlike analysts, they don't use tools. Their role is to interpret, critique, and develop positions based on the evidence.

```
from agents.utils.utils import AgentState, InvestmentDebateState, memory_init
from flyte_env import env
from langchain_openai import ChatOpenAI

# {{docs-fragment bear_researcher}}
@env.task
async def create_bear_researcher(llm: str, state: AgentState) -> AgentState:
    investment_debate_state = state.investment_debate_state
    history = investment_debate_state.history
    bear_history = investment_debate_state.bear_history

    current_response = investment_debate_state.current_response
    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    memory = await memory_init(name="bear-researcher")

    curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
    past_memories = memory.get_memories(curr_situation, n_matches=2)

    past_memory_str = ""
    for rec in past_memories:
        past_memory_str += rec["recommendation"] + "\n\n"

    prompt = f"""You are a Bear Analyst making the case against investing in the stock.
Your goal is to present a well-reasoned argument emphasizing risks, challenges, and negative indicators.
Leverage the provided research and data to highlight potential downsides and counter bullish arguments effectively.

Key points to focus on:

- Risks and Challenges: Highlight factors like market saturation, financial instability,
or macroeconomic threats that could hinder the stock's performance.
- Competitive Weaknesses: Emphasize vulnerabilities such as weaker market positioning, declining innovation,
or threats from competitors.
- Negative Indicators: Use evidence from financial data, market trends, or recent adverse news to support your position.
- Bull Counterpoints: Critically analyze the bull argument with specific data and sound reasoning,
exposing weaknesses or over-optimistic assumptions.
- Engagement: Present your argument in a conversational style, directly engaging with the bull analyst's points
and debating effectively rather than simply listing facts.

Resources available:

Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bull argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bear argument, refute the bull's claims, and engage in a dynamic debate
that demonstrates the risks and weaknesses of investing in the stock.
You must also address reflections and learn from lessons and mistakes you made in the past.
"""

    response = ChatOpenAI(model=llm).invoke(prompt)

    argument = f"Bear Analyst: {response.content}"

    new_investment_debate_state = InvestmentDebateState(
        history=history + "\n" + argument,
        bear_history=bear_history + "\n" + argument,
        bull_history=investment_debate_state.bull_history,
        current_response=argument,
        count=investment_debate_state.count + 1,
    )

    state.investment_debate_state = new_investment_debate_state
    return state

# {{/docs-fragment bear_researcher}}

@env.task
async def create_bull_researcher(llm: str, state: AgentState) -> AgentState:
    investment_debate_state = state.investment_debate_state
    history = investment_debate_state.history
    bull_history = investment_debate_state.bull_history

    current_response = investment_debate_state.current_response
    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    memory = await memory_init(name="bull-researcher")

    curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
    past_memories = memory.get_memories(curr_situation, n_matches=2)

    past_memory_str = ""
    for rec in past_memories:
        past_memory_str += rec["recommendation"] + "\n\n"

    prompt = f"""You are a Bull Analyst advocating for investing in the stock.
Your task is to build a strong, evidence-based case emphasizing growth potential, competitive advantages,
and positive market indicators.
Leverage the provided research and data to address concerns and counter bearish arguments effectively.

Key points to focus on:
- Growth Potential: Highlight the company's market opportunities, revenue projections, and scalability.
- Competitive Advantages: Emphasize factors like unique products, strong branding, or dominant market positioning.
- Positive Indicators: Use financial health, industry trends, and recent positive news as evidence.
- Bear Counterpoints: Critically analyze the bear argument with specific data and sound reasoning, addressing
concerns thoroughly and showing why the bull perspective holds stronger merit.
- Engagement: Present your argument in a conversational style, engaging directly with the bear analyst's points
and debating effectively rather than just listing data.

Resources available:
Market research report: {market_research_report}
Social media sentiment report: {sentiment_report}
Latest world affairs news: {news_report}
Company fundamentals report: {fundamentals_report}
Conversation history of the debate: {history}
Last bear argument: {current_response}
Reflections from similar situations and lessons learned: {past_memory_str}
Use this information to deliver a compelling bull argument, refute the bear's concerns, and engage in a dynamic debate
that demonstrates the strengths of the bull position.
You must also address reflections and learn from lessons and mistakes you made in the past.
"""

    response = ChatOpenAI(model=llm).invoke(prompt)

    argument = f"Bull Analyst: {response.content}"

    new_investment_debate_state = InvestmentDebateState(
        history=history + "\n" + argument,
        bull_history=bull_history + "\n" + argument,
        bear_history=investment_debate_state.bear_history,
        current_response=argument,
        count=investment_debate_state.count + 1,
    )

    state.investment_debate_state = new_investment_debate_state
    return state
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/researchers.py*

To aid reasoning, the agents can also retrieve relevant "memories" from a vector database, giving them richer historical context. The number of debate rounds is configurable, and after a few iterations of back-and-forth between the bull and bear, a research manager agent reviews their arguments and makes a final investment decision.

```
from agents.utils.utils import (
    AgentState,
    InvestmentDebateState,
    RiskDebateState,
    memory_init,
)
from flyte_env import env
from langchain_openai import ChatOpenAI

# {{docs-fragment research_manager}}
@env.task
async def create_research_manager(llm: str, state: AgentState) -> AgentState:
    history = state.investment_debate_state.history
    investment_debate_state = state.investment_debate_state
    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    memory = await memory_init(name="research-manager")

    curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
    past_memories = memory.get_memories(curr_situation, n_matches=2)

    past_memory_str = ""
    for rec in past_memories:
        past_memory_str += rec["recommendation"] + "\n\n"

    prompt = f"""As the portfolio manager and debate facilitator, your role is to critically evaluate
this round of debate and make a definitive decision:
align with the bear analyst, the bull analyst,
or choose Hold only if it is strongly justified based on the arguments presented.

Summarize the key points from both sides concisely, focusing on the most compelling evidence or reasoning.
Your recommendation—Buy, Sell, or Hold—must be clear and actionable.
Avoid defaulting to Hold simply because both sides have valid points;
commit to a stance grounded in the debate's strongest arguments.

Additionally, develop a detailed investment plan for the trader. This should include:

Your Recommendation: A decisive stance supported by the most convincing arguments.
Rationale: An explanation of why these arguments lead to your conclusion.
Strategic Actions: Concrete steps for implementing the recommendation.
Take into account your past mistakes on similar situations.
Use these insights to refine your decision-making and ensure you are learning and improving.
Present your analysis conversationally, as if speaking naturally, without special formatting.

Here are your past reflections on mistakes:
\"{past_memory_str}\"

Here is the debate:
Debate History:
{history}"""
    response = ChatOpenAI(model=llm).invoke(prompt)

    new_investment_debate_state = InvestmentDebateState(
        judge_decision=response.content,
        history=investment_debate_state.history,
        bear_history=investment_debate_state.bear_history,
        bull_history=investment_debate_state.bull_history,
        current_response=response.content,
        count=investment_debate_state.count,
    )

    state.investment_debate_state = new_investment_debate_state
    state.investment_plan = response.content

    return state

# {{/docs-fragment research_manager}}

@env.task
async def create_risk_manager(llm: str, state: AgentState) -> AgentState:
    history = state.risk_debate_state.history
    risk_debate_state = state.risk_debate_state
    trader_plan = state.investment_plan
    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    memory = await memory_init(name="risk-manager")

    curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
    past_memories = memory.get_memories(curr_situation, n_matches=2)

    past_memory_str = ""
    for rec in past_memories:
        past_memory_str += rec["recommendation"] + "\n\n"

    prompt = f"""As the Risk Management Judge and Debate Facilitator,
your goal is to evaluate the debate between three risk analysts—Risky,
Neutral, and Safe/Conservative—and determine the best course of action for the trader.
Your decision must result in a clear recommendation: Buy, Sell, or Hold.
Choose Hold only if strongly justified by specific arguments, not as a fallback when all sides seem valid.
Strive for clarity and decisiveness.

Guidelines for Decision-Making:
1. **Summarize Key Arguments**: Extract the strongest points from each analyst, focusing on relevance to the context.
2. **Provide Rationale**: Support your recommendation with direct quotes and counterarguments from the debate.
3. **Refine the Trader's Plan**: Start with the trader's original plan, **{trader_plan}**,
and adjust it based on the analysts' insights.
4. **Learn from Past Mistakes**: Use lessons from **{past_memory_str}** to address prior misjudgments
and improve the decision you are making now to make sure you don't make a wrong BUY/SELL/HOLD call that loses money.

Deliverables:
- A clear and actionable recommendation: Buy, Sell, or Hold.
- Detailed reasoning anchored in the debate and past reflections.

---

**Analysts Debate History:**
{history}

---

Focus on actionable insights and continuous improvement.
Build on past lessons, critically evaluate all perspectives, and ensure each decision advances better outcomes."""

    response = ChatOpenAI(model=llm).invoke(prompt)

    new_risk_debate_state = RiskDebateState(
        judge_decision=response.content,
        history=risk_debate_state.history,
        risky_history=risk_debate_state.risky_history,
        safe_history=risk_debate_state.safe_history,
        neutral_history=risk_debate_state.neutral_history,
        latest_speaker="Judge",
        current_risky_response=risk_debate_state.current_risky_response,
        current_safe_response=risk_debate_state.current_safe_response,
        current_neutral_response=risk_debate_state.current_neutral_response,
        count=risk_debate_state.count,
    )

    state.risk_debate_state = new_risk_debate_state
    state.final_trade_decision = response.content

    return state
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/managers.py*

### Trading agent

The trader agent consolidates the insights from analysts and researchers to generate a final recommendation. It synthesizes competing signals and produces a conclusion such as _Buy for long-term growth despite short-term volatility_.

```
from agents.utils.utils import AgentState, memory_init
from flyte_env import env
from langchain_core.messages import convert_to_openai_messages
from langchain_openai import ChatOpenAI

# {{docs-fragment trader}}
@env.task
async def create_trader(llm: str, state: AgentState) -> AgentState:
    company_name = state.company_of_interest
    investment_plan = state.investment_plan
    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    memory = await memory_init(name="trader")

    curr_situation = f"{market_research_report}\n\n{sentiment_report}\n\n{news_report}\n\n{fundamentals_report}"
    past_memories = memory.get_memories(curr_situation, n_matches=2)

    past_memory_str = ""
    for rec in past_memories:
        past_memory_str += rec["recommendation"] + "\n\n"

    context = {
        "role": "user",
        "content": f"Based on a comprehensive analysis by a team of analysts, "
        f"here is an investment plan tailored for {company_name}. "
        "This plan incorporates insights from current technical market trends, "
        "macroeconomic indicators, and social media sentiment. "
        "Use this plan as a foundation for evaluating your next trading decision.\n\n"
        f"Proposed Investment Plan: {investment_plan}\n\n"
        "Leverage these insights to make an informed and strategic decision.",
    }

    messages = [
        {
            "role": "system",
            "content": f"""You are a trading agent analyzing market data to make investment decisions.
Based on your analysis, provide a specific recommendation to buy, sell, or hold.
End with a firm decision and always conclude your response with 'FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**'
to confirm your recommendation.
Do not forget to utilize lessons from past decisions to learn from your mistakes.
Here is some reflections from similar situatiosn you traded in and the lessons learned: {past_memory_str}""",
        },
        context,
    ]

    result = ChatOpenAI(model=llm).invoke(messages)

    state.messages.append(convert_to_openai_messages(result))
    state.trader_investment_plan = result.content
    state.sender = "Trader"

    return state

# {{/docs-fragment trader}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/trader.py*

### Risk agents

Risk agents comprise agents with different risk tolerances: a risky debater, a neutral one, and a conservative one. They assess the portfolio through lenses like market volatility, liquidity, and systemic risk. Similar to the bull-bear debate, these agents engage in internal discussion, after which a risk manager makes the final call.

```
from agents.utils.utils import AgentState, RiskDebateState
from flyte_env import env
from langchain_openai import ChatOpenAI

# {{docs-fragment risk_debator}}
@env.task
async def create_risky_debator(llm: str, state: AgentState) -> AgentState:
    risk_debate_state = state.risk_debate_state
    history = risk_debate_state.history
    risky_history = risk_debate_state.risky_history

    current_safe_response = risk_debate_state.current_safe_response
    current_neutral_response = risk_debate_state.current_neutral_response

    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    trader_decision = state.trader_investment_plan

    prompt = f"""As the Risky Risk Analyst, your role is to actively champion high-reward, high-risk opportunities,
emphasizing bold strategies and competitive advantages.
When evaluating the trader's decision or plan, focus intently on the potential upside, growth potential,
and innovative benefits—even when these come with elevated risk.
Use the provided market data and sentiment analysis to strengthen your arguments and challenge the opposing views.
Specifically, respond directly to each point made by the conservative and neutral analysts,
countering with data-driven rebuttals and persuasive reasoning.
Highlight where their caution might miss critical opportunities or where their assumptions may be overly conservative.
Here is the trader's decision:

{trader_decision}

Your task is to create a compelling case for the trader's decision by questioning and critiquing the conservative
and neutral stances to demonstrate why your high-reward perspective offers the best path forward.
Incorporate insights from the following sources into your arguments:

Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history}
Here are the last arguments from the conservative analyst: {current_safe_response}
Here are the last arguments from the neutral analyst: {current_neutral_response}.
If there are no responses from the other viewpoints, do not halluncinate and just present your point.

Engage actively by addressing any specific concerns raised, refuting the weaknesses in their logic,
and asserting the benefits of risk-taking to outpace market norms.
Maintain a focus on debating and persuading, not just presenting data.
Challenge each counterpoint to underscore why a high-risk approach is optimal.
Output conversationally as if you are speaking without any special formatting."""

    response = ChatOpenAI(model=llm).invoke(prompt)

    argument = f"Risky Analyst: {response.content}"

    new_risk_debate_state = RiskDebateState(
        history=history + "\n" + argument,
        risky_history=risky_history + "\n" + argument,
        safe_history=risk_debate_state.safe_history,
        neutral_history=risk_debate_state.neutral_history,
        latest_speaker="Risky",
        current_risky_response=argument,
        current_safe_response=current_safe_response,
        current_neutral_response=current_neutral_response,
        count=risk_debate_state.count + 1,
    )

    state.risk_debate_state = new_risk_debate_state
    return state

# {{/docs-fragment risk_debator}}

@env.task
async def create_safe_debator(llm: str, state: AgentState) -> AgentState:
    risk_debate_state = state.risk_debate_state
    history = risk_debate_state.history
    safe_history = risk_debate_state.safe_history

    current_risky_response = risk_debate_state.current_risky_response
    current_neutral_response = risk_debate_state.current_neutral_response

    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    trader_decision = state.trader_investment_plan

    prompt = f"""As the Safe/Conservative Risk Analyst, your primary objective is to protect assets,
minimize volatility, and ensure steady, reliable growth. You prioritize stability, security, and risk mitigation,
carefully assessing potential losses, economic downturns, and market volatility.
When evaluating the trader's decision or plan, critically examine high-risk elements,
pointing out where the decision may expose the firm to undue risk and where more cautious
alternatives could secure long-term gains.
Here is the trader's decision:

{trader_decision}

Your task is to actively counter the arguments of the Risky and Neutral Analysts,
highlighting where their views may overlook potential threats or fail to prioritize sustainability.
Respond directly to their points, drawing from the following data sources
to build a convincing case for a low-risk approach adjustment to the trader's decision:

Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history}
Here is the last response from the risky analyst: {current_risky_response}
Here is the last response from the neutral analyst: {current_neutral_response}.
If there are no responses from the other viewpoints, do not halluncinate and just present your point.

Engage by questioning their optimism and emphasizing the potential downsides they may have overlooked.
Address each of their counterpoints to showcase why a conservative stance is ultimately the
safest path for the firm's assets.
Focus on debating and critiquing their arguments to demonstrate the strength of a low-risk strategy
over their approaches.
Output conversationally as if you are speaking without any special formatting."""

    response = ChatOpenAI(model=llm).invoke(prompt)

    argument = f"Safe Analyst: {response.content}"

    new_risk_debate_state = RiskDebateState(
        history=history + "\n" + argument,
        risky_history=risk_debate_state.risky_history,
        safe_history=safe_history + "\n" + argument,
        neutral_history=risk_debate_state.neutral_history,
        latest_speaker="Safe",
        current_risky_response=current_risky_response,
        current_safe_response=argument,
        current_neutral_response=current_neutral_response,
        count=risk_debate_state.count + 1,
    )

    state.risk_debate_state = new_risk_debate_state
    return state

@env.task
async def create_neutral_debator(llm: str, state: AgentState) -> AgentState:
    risk_debate_state = state.risk_debate_state
    history = risk_debate_state.history
    neutral_history = risk_debate_state.neutral_history

    current_risky_response = risk_debate_state.current_risky_response
    current_safe_response = risk_debate_state.current_safe_response

    market_research_report = state.market_report
    sentiment_report = state.sentiment_report
    news_report = state.news_report
    fundamentals_report = state.fundamentals_report

    trader_decision = state.trader_investment_plan

    prompt = f"""As the Neutral Risk Analyst, your role is to provide a balanced perspective,
weighing both the potential benefits and risks of the trader's decision or plan.
You prioritize a well-rounded approach, evaluating the upsides
and downsides while factoring in broader market trends,
potential economic shifts, and diversification strategies.Here is the trader's decision:

{trader_decision}

Your task is to challenge both the Risky and Safe Analysts,
pointing out where each perspective may be overly optimistic or overly cautious.
Use insights from the following data sources to support a moderate, sustainable strategy
to adjust the trader's decision:

Market Research Report: {market_research_report}
Social Media Sentiment Report: {sentiment_report}
Latest World Affairs Report: {news_report}
Company Fundamentals Report: {fundamentals_report}
Here is the current conversation history: {history}
Here is the last response from the risky analyst: {current_risky_response}
Here is the last response from the safe analyst: {current_safe_response}.
If there are no responses from the other viewpoints, do not halluncinate and just present your point.

Engage actively by analyzing both sides critically, addressing weaknesses in the risky
and conservative arguments to advocate for a more balanced approach.
Challenge each of their points to illustrate why a moderate risk strategy might offer the best of both worlds,
providing growth potential while safeguarding against extreme volatility.
Focus on debating rather than simply presenting data, aiming to show that a balanced view can lead to
the most reliable outcomes. Output conversationally as if you are speaking without any special formatting."""

    response = ChatOpenAI(model=llm).invoke(prompt)

    argument = f"Neutral Analyst: {response.content}"

    new_risk_debate_state = RiskDebateState(
        history=history + "\n" + argument,
        risky_history=risk_debate_state.risky_history,
        safe_history=risk_debate_state.safe_history,
        neutral_history=neutral_history + "\n" + argument,
        latest_speaker="Neutral",
        current_risky_response=current_risky_response,
        current_safe_response=current_safe_response,
        current_neutral_response=argument,
        count=risk_debate_state.count + 1,
    )

    state.risk_debate_state = new_risk_debate_state
    return state
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/agents/risk_debators.py*

The outcome of the risk manager — whether to proceed with the trade or not — is considered the final decision of the trading simulation.

You can visualize this full pipeline in the Flyte/Union UI, where every step is logged.
You’ll see input/output metadata for each tool and agent task.
Thanks to Flyte's caching, repeated steps are skipped unless inputs change, saving time and compute resources.

### Retaining agent memory with S3 vectors

To help agents learn from past decisions, we persist their memory in a vector store. In this example, we use an [S3 vector](https://aws.amazon.com/s3/features/vectors/) bucket for their simplicity and tight integration with Flyte and Union, but any vector database can be used.

Note: To use the S3 vector store, make sure your IAM role has the following permissions configured:

```
s3vectors:CreateVectorBucket
s3vectors:CreateIndex
s3vectors:PutVectors
s3vectors:GetIndex
s3vectors:GetVectors
s3vectors:QueryVectors
s3vectors:GetVectorBucket
```

After each trade decision, you can run a `reflect_on_decisions` task. This evaluates whether the final outcome aligned with the agent's recommendation and stores that reflection in the vector store. These stored insights can later be retrieved to provide historical context and improve future decision-making.

```
# /// script
# requires-python = "==3.13"
# dependencies = [
#     "flyte>=2.0.0b52",
#     "akshare==1.16.98",
#     "backtrader==1.9.78.123",
#     "boto3==1.39.9",
#     "chainlit==2.5.5",
#     "eodhd==1.0.32",
#     "feedparser==6.0.11",
#     "finnhub-python==2.4.23",
#     "langchain-experimental==0.3.4",
#     "langchain-openai==0.3.23",
#     "pandas==2.3.0",
#     "parsel==1.10.0",
#     "praw==7.8.1",
#     "pytz==2025.2",
#     "questionary==2.1.0",
#     "redis==6.2.0",
#     "requests==2.32.4",
#     "stockstats==0.6.5",
#     "tqdm==4.67.1",
#     "tushare==1.4.21",
#     "typing-extensions==4.14.0",
#     "yfinance==0.2.63",
# ]
# main = "main"
# params = ""
# ///
import asyncio
from copy import deepcopy

import agents
import agents.analysts
from agents.managers import create_research_manager, create_risk_manager
from agents.researchers import create_bear_researcher, create_bull_researcher
from agents.risk_debators import (
    create_neutral_debator,
    create_risky_debator,
    create_safe_debator,
)
from agents.trader import create_trader
from agents.utils.utils import AgentState
from flyte_env import DEEP_THINKING_LLM, QUICK_THINKING_LLM, env, flyte
from langchain_openai import ChatOpenAI
from reflection import (
    reflect_bear_researcher,
    reflect_bull_researcher,
    reflect_research_manager,
    reflect_risk_manager,
    reflect_trader,
)

@env.task
async def process_signal(full_signal: str, QUICK_THINKING_LLM: str) -> str:
    """Process a full trading signal to extract the core decision."""

    messages = [
        {
            "role": "system",
            "content": """You are an efficient assistant designed to analyze paragraphs or
financial reports provided by a group of analysts.
Your task is to extract the investment decision: SELL, BUY, or HOLD.
Provide only the extracted decision (SELL, BUY, or HOLD) as your output,
without adding any additional text or information.""",
        },
        {"role": "human", "content": full_signal},
    ]

    return ChatOpenAI(model=QUICK_THINKING_LLM).invoke(messages).content

async def run_analyst(analyst_name, state, online_tools):
    # Create a copy of the state for isolation
    run_fn = getattr(agents.analysts, f"create_{analyst_name}_analyst")

    # Run the analyst's chain
    result_state = await run_fn(QUICK_THINKING_LLM, state, online_tools)

    # Determine the report key
    report_key = (
        "sentiment_report"
        if analyst_name == "social_media"
        else f"{analyst_name}_report"
    )
    report_value = getattr(result_state, report_key)

    return result_state.messages[1:], report_key, report_value

# {{docs-fragment main}}
@env.task
async def main(
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> tuple[str, AgentState]:
    if not selected_analysts:
        raise ValueError(
            "No analysts selected. Please select at least one analyst from market, fundamentals, news, or social_media."
        )

    state = AgentState(
        messages=[{"role": "human", "content": company_name}],
        company_of_interest=company_name,
        trade_date=str(trade_date),
    )

    # Run all analysts concurrently
    results = await asyncio.gather(
        *[
            run_analyst(analyst, deepcopy(state), online_tools)
            for analyst in selected_analysts
        ]
    )

    # Flatten and append all resulting messages into the shared state
    for messages, report_attr, report in results:
        state.messages.extend(messages)
        setattr(state, report_attr, report)

    # Bull/Bear debate loop
    state = await create_bull_researcher(QUICK_THINKING_LLM, state)  # Start with bull
    while state.investment_debate_state.count < 2 * max_debate_rounds:
        current = state.investment_debate_state.current_response
        if current.startswith("Bull"):
            state = await create_bear_researcher(QUICK_THINKING_LLM, state)
        else:
            state = await create_bull_researcher(QUICK_THINKING_LLM, state)

    state = await create_research_manager(DEEP_THINKING_LLM, state)
    state = await create_trader(QUICK_THINKING_LLM, state)

    # Risk debate loop
    state = await create_risky_debator(QUICK_THINKING_LLM, state)  # Start with risky
    while state.risk_debate_state.count < 3 * max_risk_discuss_rounds:
        speaker = state.risk_debate_state.latest_speaker
        if speaker == "Risky":
            state = await create_safe_debator(QUICK_THINKING_LLM, state)
        elif speaker == "Safe":
            state = await create_neutral_debator(QUICK_THINKING_LLM, state)
        else:
            state = await create_risky_debator(QUICK_THINKING_LLM, state)

    state = await create_risk_manager(DEEP_THINKING_LLM, state)
    decision = await process_signal(state.final_trade_decision, QUICK_THINKING_LLM)

    return decision, state

# {{/docs-fragment main}}

# {{docs-fragment reflect_on_decisions}}
@env.task
async def reflect_and_store(state: AgentState, returns: str) -> str:
    await asyncio.gather(
        reflect_bear_researcher(state, returns),
        reflect_bull_researcher(state, returns),
        reflect_trader(state, returns),
        reflect_risk_manager(state, returns),
        reflect_research_manager(state, returns),
    )

    return "Reflection completed."

# Run the reflection task after the main function
@env.task(cache="disable")
async def reflect_on_decisions(
    returns: str,
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> str:
    _, state = await main(
        selected_analysts,
        max_debate_rounds,
        max_risk_discuss_rounds,
        online_tools,
        company_name,
        trade_date,
    )

    return await reflect_and_store(state, returns)

# {{/docs-fragment reflect_on_decisions}}

# {{docs-fragment execute_main}}
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main)
    print(run.url)
    run.wait()

    # run = flyte.run(reflect_on_decisions, "+3.2% gain over 5 days")
    # print(run.url)

# {{/docs-fragment execute_main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/main.py*

### Running the simulation

First, set up your OpenAI secret (from [openai.com](https://platform.openai.com/api-keys)) and Finnhub API key (from [finnhub.io](https://finnhub.io/)):

```
flyte create secret openai_api_key <YOUR_OPENAI_API_KEY>
flyte create secret finnhub_api_key <YOUR_FINNHUB_API_KEY>
```

Then [clone the repo](https://github.com/unionai/unionai-examples), navigate to the `tutorials-v2/trading_agents` directory, and run the following commands:

```
flyte create config --endpoint <FLYTE_OR_UNION_ENDPOINT> --project <PROJECT_NAME> --domain <DOMAIN_NAME> --builder remote
uv run main.py
```

If you'd like to run the `reflect_on_decisions` task instead, comment out the `main` function call and uncomment the `reflect_on_decisions` call in the `__main__` block:

```
# /// script
# requires-python = "==3.13"
# dependencies = [
#     "flyte>=2.0.0b52",
#     "akshare==1.16.98",
#     "backtrader==1.9.78.123",
#     "boto3==1.39.9",
#     "chainlit==2.5.5",
#     "eodhd==1.0.32",
#     "feedparser==6.0.11",
#     "finnhub-python==2.4.23",
#     "langchain-experimental==0.3.4",
#     "langchain-openai==0.3.23",
#     "pandas==2.3.0",
#     "parsel==1.10.0",
#     "praw==7.8.1",
#     "pytz==2025.2",
#     "questionary==2.1.0",
#     "redis==6.2.0",
#     "requests==2.32.4",
#     "stockstats==0.6.5",
#     "tqdm==4.67.1",
#     "tushare==1.4.21",
#     "typing-extensions==4.14.0",
#     "yfinance==0.2.63",
# ]
# main = "main"
# params = ""
# ///
import asyncio
from copy import deepcopy

import agents
import agents.analysts
from agents.managers import create_research_manager, create_risk_manager
from agents.researchers import create_bear_researcher, create_bull_researcher
from agents.risk_debators import (
    create_neutral_debator,
    create_risky_debator,
    create_safe_debator,
)
from agents.trader import create_trader
from agents.utils.utils import AgentState
from flyte_env import DEEP_THINKING_LLM, QUICK_THINKING_LLM, env, flyte
from langchain_openai import ChatOpenAI
from reflection import (
    reflect_bear_researcher,
    reflect_bull_researcher,
    reflect_research_manager,
    reflect_risk_manager,
    reflect_trader,
)

@env.task
async def process_signal(full_signal: str, QUICK_THINKING_LLM: str) -> str:
    """Process a full trading signal to extract the core decision."""

    messages = [
        {
            "role": "system",
            "content": """You are an efficient assistant designed to analyze paragraphs or
financial reports provided by a group of analysts.
Your task is to extract the investment decision: SELL, BUY, or HOLD.
Provide only the extracted decision (SELL, BUY, or HOLD) as your output,
without adding any additional text or information.""",
        },
        {"role": "human", "content": full_signal},
    ]

    return ChatOpenAI(model=QUICK_THINKING_LLM).invoke(messages).content

async def run_analyst(analyst_name, state, online_tools):
    # Create a copy of the state for isolation
    run_fn = getattr(agents.analysts, f"create_{analyst_name}_analyst")

    # Run the analyst's chain
    result_state = await run_fn(QUICK_THINKING_LLM, state, online_tools)

    # Determine the report key
    report_key = (
        "sentiment_report"
        if analyst_name == "social_media"
        else f"{analyst_name}_report"
    )
    report_value = getattr(result_state, report_key)

    return result_state.messages[1:], report_key, report_value

# {{docs-fragment main}}
@env.task
async def main(
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> tuple[str, AgentState]:
    if not selected_analysts:
        raise ValueError(
            "No analysts selected. Please select at least one analyst from market, fundamentals, news, or social_media."
        )

    state = AgentState(
        messages=[{"role": "human", "content": company_name}],
        company_of_interest=company_name,
        trade_date=str(trade_date),
    )

    # Run all analysts concurrently
    results = await asyncio.gather(
        *[
            run_analyst(analyst, deepcopy(state), online_tools)
            for analyst in selected_analysts
        ]
    )

    # Flatten and append all resulting messages into the shared state
    for messages, report_attr, report in results:
        state.messages.extend(messages)
        setattr(state, report_attr, report)

    # Bull/Bear debate loop
    state = await create_bull_researcher(QUICK_THINKING_LLM, state)  # Start with bull
    while state.investment_debate_state.count < 2 * max_debate_rounds:
        current = state.investment_debate_state.current_response
        if current.startswith("Bull"):
            state = await create_bear_researcher(QUICK_THINKING_LLM, state)
        else:
            state = await create_bull_researcher(QUICK_THINKING_LLM, state)

    state = await create_research_manager(DEEP_THINKING_LLM, state)
    state = await create_trader(QUICK_THINKING_LLM, state)

    # Risk debate loop
    state = await create_risky_debator(QUICK_THINKING_LLM, state)  # Start with risky
    while state.risk_debate_state.count < 3 * max_risk_discuss_rounds:
        speaker = state.risk_debate_state.latest_speaker
        if speaker == "Risky":
            state = await create_safe_debator(QUICK_THINKING_LLM, state)
        elif speaker == "Safe":
            state = await create_neutral_debator(QUICK_THINKING_LLM, state)
        else:
            state = await create_risky_debator(QUICK_THINKING_LLM, state)

    state = await create_risk_manager(DEEP_THINKING_LLM, state)
    decision = await process_signal(state.final_trade_decision, QUICK_THINKING_LLM)

    return decision, state

# {{/docs-fragment main}}

# {{docs-fragment reflect_on_decisions}}
@env.task
async def reflect_and_store(state: AgentState, returns: str) -> str:
    await asyncio.gather(
        reflect_bear_researcher(state, returns),
        reflect_bull_researcher(state, returns),
        reflect_trader(state, returns),
        reflect_risk_manager(state, returns),
        reflect_research_manager(state, returns),
    )

    return "Reflection completed."

# Run the reflection task after the main function
@env.task(cache="disable")
async def reflect_on_decisions(
    returns: str,
    selected_analysts: list[str] = [
        "market",
        "fundamentals",
        "news",
        "social_media",
    ],
    max_debate_rounds: int = 1,
    max_risk_discuss_rounds: int = 1,
    online_tools: bool = True,
    company_name: str = "NVDA",
    trade_date: str = "2024-05-12",
) -> str:
    _, state = await main(
        selected_analysts,
        max_debate_rounds,
        max_risk_discuss_rounds,
        online_tools,
        company_name,
        trade_date,
    )

    return await reflect_and_store(state, returns)

# {{/docs-fragment reflect_on_decisions}}

# {{docs-fragment execute_main}}
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main)
    print(run.url)
    run.wait()

    # run = flyte.run(reflect_on_decisions, "+3.2% gain over 5 days")
    # print(run.url)

# {{/docs-fragment execute_main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/trading_agents/main.py*

Then run:

```
uv run main.py
```

## Why Flyte? _(A quick note before you go)_

You might now be wondering: can't I just build all this with Python and LangChain?
Absolutely. But as your project grows, you'll likely run into these challenges:

1.  **Observability**: Agent workflows can feel opaque. You send a prompt, get a response, but what happened in between?

    - Were the right tools used?
    - Were correct arguments passed?
    - How did the LLM reason through intermediate steps?
    - Why did it fail?

    Flyte gives you a window into each of these stages.

2.  **Multi-agent coordination**: Real-world applications often require multiple agents with distinct roles and responsibilities. In such cases, you'll need:

    - Isolated state per agent,
    - Shared context where needed,
    - And coordination — sequential or parallel.

    Managing this manually gets fragile, fast. Flyte handles it for you.

3.  **Scalability**: Agents and tools might need to run in isolated or containerized environments. Whether you're scaling out to more agents or more powerful hardware, Flyte lets you scale without taxing your local machine or racking up unnecessary cloud bills.
4.  **Durability & recovery**: LLM-based workflows are often long-running and expensive. If something fails halfway:

    - Do you lose all progress?
    - Replay everything from scratch?

    With Flyte, you get built-in caching, checkpointing, and recovery, so you can resume where you left off.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/tutorials/trading-agents/_index.md
**HTML**: https://www.union.ai/docs/v2/union/tutorials/trading-agents/
