Triggers
Triggers allow you to automate and parameterize an execution by scheduling its start time and providing overrides for its task inputs.
Currently, only schedule triggers are supported. This type of trigger runs a task based on a Cron expression or a fixed-rate schedule.
Support is coming for other trigger types, such as:
- Webhook triggers: Hit an API endpoint to run your task.
- Artifact triggers: Run a task when a specific artifact is produced.
Triggers are set in the task decorator
A trigger is created by setting the triggers parameter in the task decorator to a flyte.Trigger object or a list of such objects (triggers are not settable at the TaskEnvironment definition or task.override levels).
Here is a simple example:
import flyte
from datetime import datetime, timezone
env = flyte.TaskEnvironment(name="trigger_env")
@env.task(triggers=flyte.Trigger.hourly()) # Every hour
def hourly_task(trigger_time: datetime, x: int = 1) -> str:
return f"Hourly example executed at {trigger_time.isoformat()} with x={x}"Here we use a predefined schedule trigger to run the hourly_task every hour.
Other predefined triggers can be used similarly (see
Predefined schedule triggers below).
If you want full control over the trigger behavior, you can define a trigger using the flyte.Trigger class directly.
flyte.Trigger
The Trigger class allows you to define custom triggers with full control over scheduling and execution behavior. It has the following signature:
flyte.Trigger(
name,
automation,
description="",
auto_activate=True,
inputs=None,
env_vars=None,
interruptible=None,
overwrite_cache=False,
queue=None,
labels=None,
annotations=None
)Core Parameters
name: str (required)
The unique identifier for the trigger within your project/domain.
automation: Union[Cron, FixedRate] (required)
Defines when the trigger fires. Use flyte.Cron("expression") for Cron-based scheduling or flyte.FixedRate(interval_minutes, start_time=start_time) for fixed intervals.
Configuration Parameters
description: str = ""
Human-readable description of the trigger’s purpose.
auto_activate: bool = True
Whether the trigger should be automatically activated when deployed. Set to False to deploy inactive triggers that require manual activation.
inputs: Dict[str, Any] | None = None
Default parameter values for the task when triggered. Use flyte.TriggerTime as a value to inject the trigger execution timestamp into that parameter.
Runtime Override Parameters
env_vars: Dict[str, str] | None = None
Environment variables to set for triggered executions, overriding the task’s default environment variables.
interruptible: bool | None = None
Whether triggered executions can be interrupted (useful for cost optimization with spot/preemptible instances). Overrides the task’s interruptible setting.
overwrite_cache: bool = False
Whether to bypass/overwrite task cache for triggered executions, ensuring fresh computation.
queue: str | None = None
Specific execution queue for triggered runs, overriding the task’s default queue.
Metadata Parameters
labels: Mapping[str, str] | None = None
Key-value labels for organizing and filtering triggers (e.g., team, component, priority).
annotations: Mapping[str, str] | None = None
Additional metadata, often used by infrastructure tools for compliance, monitoring, or cost tracking.
Here’s a comprehensive example showing all parameters:
comprehensive_trigger = flyte.Trigger(
name="monthly_financial_report",
automation=flyte.Cron("0 6 1 * *", timezone="America/New_York"),
description="Monthly financial report generation for executive team",
auto_activate=True,
inputs={
"report_date": flyte.TriggerTime,
"report_type": "executive_summary",
"include_forecasts": True
},
env_vars={
"REPORT_OUTPUT_FORMAT": "PDF",
"EMAIL_NOTIFICATIONS": "true"
},
interruptible=False, # Critical report, use dedicated resources
overwrite_cache=True, # Always fresh data
queue="financial-reports",
labels={
"team": "finance",
"criticality": "high",
"automation": "scheduled"
},
annotations={
"compliance.company.com/sox-required": "true",
"backup.company.com/retain-days": "2555" # 7 years
}
)The automation parameter with flyte.FixedRate
You can define a fixed-rate schedule trigger by setting the automation parameter of the flyte.Trigger to an instance of flyte.FixedRate.
The flyte.FixedRate has the following signature:
flyte.FixedRate(
interval_minutes,
start_time=None
)Parameters
interval_minutes: int (required)
The interval between trigger executions in minutes.
start_time: datetime | None
When to start the fixed rate schedule. If not specified, starts when the trigger is deployed and activated.
Examples
# Every 90 minutes, starting when deployed
every_90_min = flyte.Trigger(
"data_processing",
flyte.FixedRate(interval_minutes=90)
)
# Every 6 hours (360 minutes), starting at a specific time
specific_start = flyte.Trigger(
"batch_job",
flyte.FixedRate(
interval_minutes=360, # 6 hours
start_time=datetime(2025, 12, 1, 9, 0, 0) # Start Dec 1st at 9 AM
)
)The automation parameter with flyte.Cron
You can define a Cron-based schedule trigger by setting the automation parameter to an instance of flyte.Cron.
The flyte.Cron has the following signature:
flyte.Cron(
cron_expression,
timezone=None
)Parameters
cron_expression: str (required)
The cron expression defining when the trigger should fire. Uses standard Unix cron format with five fields: minute, hour, day of month, month, and day of week.
timezone: str | None
The timezone for the cron expression. If not specified, it defaults to UTC. Uses standard timezone names like “America/New_York” or “Europe/London”.
Examples
# Every day at 6 AM UTC
daily_trigger = flyte.Trigger(
"daily_report",
flyte.Cron("0 6 * * *")
)
# Every weekday at 9:30 AM Eastern Time
weekday_trigger = flyte.Trigger(
"business_hours_task",
flyte.Cron("30 9 * * 1-5", timezone="America/New_York")
)Cron Expressions
Here are some common cron expressions you can use:
| Expression | Description |
|---|---|
0 0 * * * |
Every day at midnight |
0 9 * * 1-5 |
Every weekday at 9 AM |
30 14 * * 6 |
Every Saturday at 2:30 PM |
0 0 1 * * |
First day of every month at midnight |
0 0 25 * * |
25th day of every month at midnight |
0 0 * * 0 |
Every Sunday at midnight |
*/10 * * * * |
Every 10 minutes |
0 */2 * * * |
Every 2 hours |
For a full guide on Cron syntax, refer to Crontab Guru.
The inputs parameter
The inputs parameter allows you to provide default values for your task’s parameters when the trigger fires.
This is essential for parameterizing your automated executions and passing trigger-specific data to your tasks.
Basic Usage
trigger_with_inputs = flyte.Trigger(
"data_processing",
flyte.Cron("0 6 * * *"), # Daily at 6 AM
inputs={
"batch_size": 1000,
"environment": "production",
"debug_mode": False
}
)
@env.task(triggers=trigger_with_inputs)
def process_data(batch_size: int, environment: str, debug_mode: bool = True) -> str:
return f"Processing {batch_size} items in {environment} mode"Using flyte.TriggerTime
The special flyte.TriggerTime value is used in the inputs to indicate the task parameter into which Flyte will inject the trigger execution timestamp:
timestamp_trigger = flyte.Trigger(
"daily_report",
flyte.Cron("0 0 * * *"), # Daily at midnight
inputs={
"report_date": flyte.TriggerTime, # Receives trigger execution time
"report_type": "daily_summary"
}
)
@env.task(triggers=timestamp_trigger)
def generate_report(report_date: datetime, report_type: str) -> str:
return f"Generated {report_type} for {report_date.strftime('%Y-%m-%d')}"Required vs optional parameters
If your task has parameters without default values, you must provide values for them in the trigger inputs, otherwise the trigger will fail to execute.
# ❌ This will fail - missing required parameter 'data_source'
bad_trigger = flyte.Trigger(
"bad_trigger",
flyte.Cron("0 0 * * *")
# Missing inputs for required parameter 'data_source'
)
@env.task(triggers=bad_trigger)
def bad_trigger_taska(data_source: str, batch_size: int = 100) -> str:
return f"Processing from {data_source} with batch size {batch_size}"
# ✅ This works - all required parameters provided
good_trigger = flyte.Trigger(
"good_trigger",
flyte.Cron("0 0 * * *"),
inputs={
"data_source": "prod_database", # Required parameter
"batch_size": 500 # Override default
}
)
@env.task(triggers=good_trigger)
def good_trigger_task(data_source: str, batch_size: int = 100) -> str:
return f"Processing from {data_source} with batch size {batch_size}"Complex input types
You can pass various data types through trigger inputs:
complex_trigger = flyte.Trigger(
"ml_training",
flyte.Cron("0 2 * * 1"), # Weekly on Monday at 2 AM
inputs={
"model_config": {
"learning_rate": 0.01,
"batch_size": 32,
"epochs": 100
},
"feature_columns": ["age", "income", "location"],
"validation_split": 0.2,
"training_date": flyte.TriggerTime
}
)
@env.task(triggers=complex_trigger)
def train_model(
model_config: dict,
feature_columns: list[str],
validation_split: float,
training_date: datetime
) -> str:
return f"Training model with {len(feature_columns)} features on {training_date}"Predefined schedule triggers
For common scheduling needs, Flyte provides predefined trigger methods that create Cron-based schedules without requiring you to specify cron expressions manually. These are convenient shortcuts for frequently used scheduling patterns.
Available Predefined Triggers
minutely_trigger = flyte.Trigger.minutely() # Every minute
hourly_trigger = flyte.Trigger.hourly() # Every hour
daily_trigger = flyte.Trigger.daily() # Every day at midnight
weekly_trigger = flyte.Trigger.weekly() # Every week (Sundays at midnight)
monthly_trigger = flyte.Trigger.monthly() # Every month (1st day at midnight)For reference, here’s what each predefined trigger is equivalent to:
# These are functionally identical:
flyte.Trigger.minutely() == flyte.Trigger("minutely", flyte.Cron("* * * * *"))
flyte.Trigger.hourly() == flyte.Trigger("hourly", flyte.Cron("0 * * * *"))
flyte.Trigger.daily() == flyte.Trigger("daily", flyte.Cron("0 0 * * *"))
flyte.Trigger.weekly() == flyte.Trigger("weekly", flyte.Cron("0 0 * * 0"))
flyte.Trigger.monthly() == flyte.Trigger("monthly", flyte.Cron("0 0 1 * *"))Predefined Trigger Parameters
All predefined trigger methods (minutely(), hourly(), daily(), weekly(), monthly()) accept the same set of parameters:
flyte.Trigger.daily(
trigger_time_input_key="trigger_time",
name="daily",
description="A trigger that runs daily at midnight",
auto_activate=True,
inputs=None,
env_vars=None,
interruptible=None,
overwrite_cache=False,
queue=None,
labels=None,
annotations=None
)Core Parameters
trigger_time_input_key: str = "trigger_time"
The name of the task parameter that will receive the execution timestamp.
If no trigger_time_input_key is provided, the default is trigger_time.
In this case, if the task does not have a parameter named trigger_time, the task will still be executed, but, obviously, the timestamp will not be passed.
However, if you do specify a trigger_time_input_key, but your task does not actually have the specified parameter, an error will be raised at trigger deployment time.
name: str
The unique identifier for the trigger. Defaults to the method name ("daily", "hourly", etc.).
description: str
Human-readable description of the trigger’s purpose. Each method has a sensible default.
Configuration Parameters
auto_activate: bool = True
Whether the trigger should be automatically activated when deployed. Set to False to deploy inactive triggers that require manual activation.
inputs: Dict[str, Any] | None = None
Additional parameter values for your task when triggered. The trigger_time_input_key parameter is automatically included with flyte.TriggerTime as its value.
Runtime Override Parameters
env_vars: Dict[str, str] | None = None
Environment variables to set for triggered executions, overriding the task’s default environment variables.
interruptible: bool | None = None
Whether triggered executions can be interrupted (useful for cost optimization with spot/preemptible instances). Overrides the task’s interruptible setting.
overwrite_cache: bool = False
Whether to bypass/overwrite task cache for triggered executions, ensuring fresh computation.
queue: str | None = None
Specific execution queue for triggered runs, overriding the task’s default queue.
Metadata Parameters
labels: Mapping[str, str] | None = None
Key-value labels for organizing and filtering triggers (e.g., team, component, priority).
annotations: Mapping[str, str] | None = None
Additional metadata, often used by infrastructure tools for compliance, monitoring, or cost tracking.
Trigger time in predefined triggers
By default, predefined triggers will pass the execution time to the parameter trigger_time of type datetime,if that parameter exists on the task.
If no such parameter exists, the task will still be executed without error.
Optionally, you can customize the parameter name that receives the trigger execution timestamp by setting the trigger_time_input_key parameter (in this case the absence of this custom parameter on the task will raise an error at trigger deployment time):
@env.task(triggers=flyte.Trigger.daily(trigger_time_input_key="scheduled_at"))
def task_with_custom_trigger_time_input(scheduled_at: datetime) -> str:
return f"Executed at {scheduled_at}"Multiple triggers per task
You can attach multiple triggers to a single task by providing a list of triggers. This allows you to run the same task on different schedules or with different configurations:
@env.task(triggers=[
flyte.Trigger.hourly(), # Predefined trigger
flyte.Trigger.daily(), # Another predefined trigger
flyte.Trigger("custom", flyte.Cron("0 */6 * * *")) # Custom trigger every 6 hours
])
def multi_trigger_task(trigger_time: datetime = flyte.TriggerTime) -> str:
# Different logic based on execution timing
if trigger_time.hour == 0: # Daily run at midnight
return f"Daily comprehensive processing at {trigger_time}"
else: # Hourly or custom runs
return f"Regular processing at {trigger_time.strftime('%H:%M')}"You can mix and match trigger types, combining predefined triggers with those that use flyte.Cron, and flyte.FixedRate automations (see below for explanations of these concepts).
Deploying a task with triggers
We recommend that you define your triggers in code together with your tasks and deploy them together.
The Union UI displays:
-
Owner- who last deployed the trigger. -
Last updated- who last activated or deactivated the trigger and when. Note: If you deploy a trigger withauto_activate=True(default), this will match theOwner. -
Last Run- when was the last run created by this trigger.
For development and debugging purposes, you can adjust and deploy individual triggers from the UI.
To deploy a task with its triggers, you can either use Flyte CLI:
flyte deploy -p <project> -d <domain> <file_with_tasks_and_triggers.py> envOr in Python:
flyte.deploy(env)Upon deploy, all triggers that are associated with a given task T will be automatically switched to apply to the latest version of that task. Triggers on task T which are defined elsewhere (i.e. in the UI) will be deleted unless they have been referenced in the task definition of T
Activating and deactivating triggers
By default, triggers are automatically activated upon deployment (auto_activate=True).
Alternatively, you can set auto_activate=False to deploy inactive triggers.
An inactive trigger will not create runs until activated.
env = flyte.TaskEnvironment(name="my_task_env")
custom_cron_trigger = flyte.Trigger(
"custom_cron",
flyte.Cron("0 0 * * *"),
auto_activate=False # Dont create runs yet
)
@env.task(triggers=custom_cron_trigger)
def custom_task() -> str:
return "Hello, world!"This trigger won’t create runs until it is explicitly activated. You can activate a trigger via the Flyte CLI:
flyte update trigger custom_cron my_task_env.custom_task --activate --project <project> --domain <domain>If you want to stop your trigger from creating new runs, you can deactivate it:
flyte update trigger custom_cron my_task_env.custom_task --deactivate --project <project> --domain <domain>You can also view and manage your deployed triggers in the Union UI.
Trigger run timing
The timing of the first run created by a trigger depends on the type of trigger used (Cron-based or Fixed-rate) and whether the trigger is active upon deployment.
Cron-based triggers
For Cron-based triggers, the first run will be created at the next scheduled time according to the cron expression after trigger activation and similarly thereafter.
-
0 0 * * *If deployed at 17:00 today, the trigger will first fire 7 hours later (0:00 of the following day) and then every day at 0:00 thereafter. -
*/15 14 * * 1-5if today is Tuesday at 17:00, the trigger will fire the next day (Wednesday) at 14:00, 14:15, 14:30, and 14:45 and then the same for every subsequent weekday thereafter.
Fixed-rate triggers without start_time
If no start_time is specified, then the first run will be created after the specified interval from the time of activation. No run will be created immediately upon activation, but the activation time will be used as the reference point for future runs.
No start_time, auto_activate: True
Let’s say you define a fixed rate trigger with automatic activation like this:
my_trigger = flyte.Trigger("my_trigger", flyte.FixedRate(60))In this case, the first run will occur 60 minutes after the successful deployment of the trigger. So, if you deployed this trigger at 13:15, the first run will occur at 14:15 and so on thereafter.
No start_time, auto_activate: False
On the other hand, let’s say you define a fixed rate trigger without automatic activation like this:
my_trigger = flyte.Trigger("my_trigger", flyte.FixedRate(60), auto_activate=False)Then you activate it after about 3 hours. In this case the first run will kick off 60 minutes after trigger activation. If you deployed the trigger at 13:15 and activated it at 16:07, the first run will occur at 17:07.
Fixed-rate triggers with start_time
If a start_time is specified, the timing of the first run depends on whether the trigger is active at start_time or not.
Fixed-rate with start_time while active
If a start_time is specified, and the trigger is active at start_time then the first run will occur at start_time and then at the specified interval thereafter.
For example:
my_trigger = flyte.Trigger(
"my_trigger",
# Runs every 60 minutes starting from October 26th, 2025, 10:00am
flyte.FixedRate(60, start_time=datetime(2025, 10, 26, 10, 0, 0)),
)If you deploy this trigger on October 24th, 2025, the trigger will wait until October 26th 10:00am and will create the first run at exactly 10:00am.
Fixed-rate with start_time while inactive
If a start time is specified, but the trigger is activated after start_time, then the first run will be created when the next time point occurs that aligns with the recurring trigger interval using start_time as the initial reference point.
For example:
custom_rate_trigger = flyte.Trigger(
"custom_rate",
# Runs every 60 minutes starting from October 26th, 2025, 10:00am
flyte.FixedRate(60, start_time=datetime(2025, 10, 26, 10, 0, 0)),
auto_activate=False
)If activated later than the start_time, say on October 28th 12:35pm for example, the first run will be created at October 28th at 1:00pm.
Deleting triggers
If you decide that you don’t need a trigger anymore, you can remove the trigger from the task definition and deploy the task again.
Alternatively, you can use Flyte CLI:
flyte delete trigger custom_cron my_task_env.custom_task --project <project> --domain <domain>Schedule time zones
Setting time zone for a Cron schedule
Cron expressions are by default in UTC, but it’s possible to specify custom time zones like so:
sf_trigger = flyte.Trigger(
"sf_tz",
flyte.Cron(
"0 9 * * *", timezone="America/Los_Angeles"
), # Every day at 9 AM PT
inputs={"start_time": flyte.TriggerTime, "x": 1},
)
nyc_trigger = flyte.Trigger(
"nyc_tz",
flyte.Cron(
"1 12 * * *", timezone="America/New_York"
), # Every day at 12:01 PM ET
inputs={"start_time": flyte.TriggerTime, "x": 1},
)The above two schedules will fire 1 minute apart, at 9 AM PT and 12:01 PM ET respectively.
flyte.TriggerTime is always in UTC
The flyte.TriggerTime value is always in UTC. For timezone-aware logic, convert as needed:
@env.task(triggers=flyte.Trigger.minutely(trigger_time_input_key="utc_trigger_time", name="timezone_trigger"))
def timezone_task(utc_trigger_time: datetime) -> str:
local_time = utc_trigger_time.replace(tzinfo=timezone.utc).astimezone()
return f"Task fired at {utc_trigger_time} UTC ({local_time} local)"Daylight Savings Time behavior
When Daylight Savings Time (DST) begins and ends, it can impact when the scheduled execution begins.
On the day DST begins, time jumps from 2:00AM to 3:00AM, which means the time of 2:30AM won’t exist. In this case, the trigger will not fire until the next 2:30AM, which is the next day.
On the day DST ends, the hour from 1:00AM to 2:00AM repeats, which means the time of 1:30AM will exist twice. If the schedule above was instead set for 1:30AM, it would only run once, on the first occurrence of 1:30AM.