flyte
Flyte SDK for authoring compound AI applications, services and workflows.
Directory
Classes
| Class | Description |
|---|---|
Cache |
Cache configuration for a task. |
Cron |
This class defines a Cron automation that can be associated with a Trigger in Flyte. |
Device |
Represents a device type, its quantity and partition if applicable. |
Environment |
|
FixedRate |
This class defines a FixedRate automation that can be associated with a Trigger in Flyte. |
Image |
This is a representation of Container Images, which can be used to create layered images programmatically. |
PodTemplate |
Custom PodTemplate specification for a Task. |
Resources |
Resources such as CPU, Memory, and GPU that can be allocated to a task. |
RetryStrategy |
Retry strategy for the task or task environment. |
ReusePolicy |
ReusePolicy can be used to configure a task to reuse the environment. |
Secret |
Secrets are used to inject sensitive information into tasks or image build context. |
TaskEnvironment |
Environment class to define a new environment for a set of tasks. |
Timeout |
Timeout class to define a timeout for a task. |
Trigger |
This class defines specification of a Trigger, that can be associated with any Flyte V2 task. |
Protocols
| Protocol | Description |
|---|---|
CachePolicy |
Base class for protocol classes. |
Methods
| Method | Description |
|---|---|
AMD_GPU() |
Create an AMD GPU device instance. |
GPU() |
Create a GPU device instance. |
HABANA_GAUDI() |
Create a Habana Gaudi device instance. |
Neuron() |
Create a Neuron device instance. |
TPU() |
Create a TPU device instance. |
build() |
Build an image. |
build_images() |
Build the images for the given environments. |
ctx() |
Returns flyte. |
current_domain() |
Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration. |
custom_context() |
Synchronous context manager to set input context for tasks spawned within this block. |
deploy() |
Deploy the given environment or list of environments. |
get_custom_context() |
Get the current input context. |
group() |
Create a new group with the given name. |
init() |
Initialize the Flyte system with the given configuration. |
init_from_config() |
Initialize the Flyte system using a configuration file or Config object. |
init_in_cluster() |
|
map() |
Map a function over the provided arguments with concurrent execution. |
run() |
Run a task with the given parameters. |
serve() |
Serve a Flyte app using an AppEnvironment. |
trace() |
A decorator that traces function execution with timing information. |
version() |
Returns the version of the Flyte SDK. |
with_runcontext() |
Launch a new run with the given parameters as the context. |
with_servecontext() |
Create a serve context with custom configuration. |
Variables
| Property | Type | Description |
|---|---|---|
TimeoutType |
UnionType |
|
TriggerTime |
_trigger_time |
|
__version__ |
str |
|
logger |
Logger |
Methods
AMD_GPU()
def AMD_GPU(
device: typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X'],
) -> flyte._resources.DeviceCreate an AMD GPU device instance.
| Parameter | Type | Description |
|---|---|---|
device |
typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X'] |
Device type (e.g., “MI100”, “MI210”, “MI250”, “MI250X”, “MI300A”, “MI300X”, “MI325X”, “MI350X”, “MI355X”). :return: Device instance. |
GPU()
def GPU(
device: typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'H200', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000'],
quantity: typing.Literal[1, 2, 3, 4, 5, 6, 7, 8],
partition: typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], typing.Literal['1g.18gb', '1g.35gb', '2g.35gb', '3g.71gb', '4g.71gb', '7g.141gb'], NoneType],
) -> flyte._resources.DeviceCreate a GPU device instance.
| Parameter | Type | Description |
|---|---|---|
device |
typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'H200', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000'] |
The type of GPU (e.g., “T4”, “A100”). |
quantity |
typing.Literal[1, 2, 3, 4, 5, 6, 7, 8] |
The number of GPUs of this type. |
partition |
typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], typing.Literal['1g.18gb', '1g.35gb', '2g.35gb', '3g.71gb', '4g.71gb', '7g.141gb'], NoneType] |
The partition of the GPU (e.g., “1g.5gb”, “2g.10gb” for gpus) or (“1x1”, … for tpus). :return: Device instance. |
HABANA_GAUDI()
def HABANA_GAUDI(
device: typing.Literal['Gaudi1'],
) -> flyte._resources.DeviceCreate a Habana Gaudi device instance.
| Parameter | Type | Description |
|---|---|---|
device |
typing.Literal['Gaudi1'] |
Device type (e.g., “DL1”). :return: Device instance. |
Neuron()
def Neuron(
device: typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u'],
) -> flyte._resources.DeviceCreate a Neuron device instance.
| Parameter | Type | Description |
|---|---|---|
device |
typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u'] |
Device type (e.g., “Inf1”, “Inf2”, “Trn1”, “Trn1n”, “Trn2”, “Trn2u”). |
TPU()
def TPU(
device: typing.Literal['V5P', 'V6E'],
partition: typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType],
)Create a TPU device instance.
| Parameter | Type | Description |
|---|---|---|
device |
typing.Literal['V5P', 'V6E'] |
Device type (e.g., “V5P”, “V6E”). |
partition |
typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType] |
Partition of the TPU (e.g., “1x1”, “2x2”, …). :return: Device instance. |
build()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await build.aio().
def build(
image: Image,
) -> strBuild an image. The existing async context will be used.
Example:
import flyte
image = flyte.Image("example_image")
if __name__ == "__main__":
asyncio.run(flyte.build.aio(image))| Parameter | Type | Description |
|---|---|---|
image |
Image |
The image(s) to build. :return: The image URI. |
build_images()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await build_images.aio().
def build_images(
envs: Environment,
) -> ImageCacheBuild the images for the given environments.
| Parameter | Type | Description |
|---|---|---|
envs |
Environment |
Environment to build images for. :return: ImageCache containing the built images. |
ctx()
def ctx()Returns flyte.models.TaskContext if within a task context, else None Note: Only use this in task code and not module level.
current_domain()
def current_domain()Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration.
This is safe to be used during deploy, run and within task code.
NOTE: This will not work if you deploy a task to a domain and then run it in another domain.
Raises InitializationError if the configuration is not initialized or domain is not set. :return: The current domain
custom_context()
def custom_context(
context: str,
)Synchronous context manager to set input context for tasks spawned within this block.
Example:
import flyte
env = flyte.TaskEnvironment(name="...")
@env.task
def t1():
ctx = flyte.get_custom_context()
print(ctx)
@env.task
def main():
# context can be passed via a context manager
with flyte.custom_context(project="my-project"):
t1() # will have {'project': 'my-project'} as context| Parameter | Type | Description |
|---|---|---|
context |
str |
Key-value pairs to set as input context |
deploy()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await deploy.aio().
def deploy(
envs: Environment,
dryrun: bool,
version: str | None,
interactive_mode: bool | None,
copy_style: CopyFiles,
) -> List[Deployment]Deploy the given environment or list of environments.
| Parameter | Type | Description |
|---|---|---|
envs |
Environment |
Environment or list of environments to deploy. |
dryrun |
bool |
dryrun mode, if True, the deployment will not be applied to the control plane. |
version |
str | None |
version of the deployment, if None, the version will be computed from the code bundle. TODO: Support for interactive_mode |
interactive_mode |
bool | None |
Optional, can be forced to True or False. If not provided, it will be set based on the current environment. For example Jupyter notebooks are considered interactive mode, while scripts are not. This is used to determine how the code bundle is created. |
copy_style |
CopyFiles |
Copy style to use when running the task :return: Deployment object containing the deployed environments and tasks. |
get_custom_context()
def get_custom_context()Get the current input context. This can be used within a task to retrieve context metadata that was passed to the action.
Context will automatically propagate to sub-actions.
Example:
import flyte
env = flyte.TaskEnvironment(name="...")
@env.task
def t1():
# context can be retrieved with `get_custom_context`
ctx = flyte.get_custom_context()
print(ctx) # {'project': '...', 'entity': '...'}:return: Dictionary of context key-value pairs
group()
def group(
name: str,
)Create a new group with the given name. The method is intended to be used as a context manager.
Example:
@task
async def my_task():
...
with group("my_group"):
t1(x,y) # tasks in this block will be grouped under "my_group"
...| Parameter | Type | Description |
|---|---|---|
name |
str |
The name of the group |
init()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await init.aio().
def init(
org: str | None,
project: str | None,
domain: str | None,
root_dir: Path | None,
log_level: int | None,
log_format: LogFormat | None,
endpoint: str | None,
headless: bool,
insecure: bool,
insecure_skip_verify: bool,
ca_cert_file_path: str | None,
auth_type: AuthType,
command: List[str] | None,
proxy_command: List[str] | None,
api_key: str | None,
client_id: str | None,
client_credentials_secret: str | None,
auth_client_config: ClientConfig | None,
rpc_retries: int,
http_proxy_url: str | None,
storage: Storage | None,
batch_size: int,
image_builder: ImageBuildEngine.ImageBuilderType,
images: typing.Dict[str, str] | None,
source_config_path: Optional[Path],
sync_local_sys_paths: bool,
load_plugin_type_transformers: bool,
)Initialize the Flyte system with the given configuration. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.
| Parameter | Type | Description |
|---|---|---|
org |
str | None |
Optional organization override for the client. Should be set by auth instead. |
project |
str | None |
Optional project name (not used in this implementation) |
domain |
str | None |
Optional domain name (not used in this implementation) |
root_dir |
Path | None |
Optional root directory from which to determine how to load files, and find paths to files. This is useful for determining the root directory for the current project, and for locating files like config etc. also use to determine all the code that needs to be copied to the remote location. defaults to the editable install directory if the cwd is in a Python editable install, else just the cwd. |
log_level |
int | None |
Optional logging level for the logger, default is set using the default initialization policies |
log_format |
LogFormat | None |
Optional logging format for the logger, default is “console” |
endpoint |
str | None |
Optional API endpoint URL |
headless |
bool |
Optional Whether to run in headless mode |
insecure |
bool |
insecure flag for the client |
insecure_skip_verify |
bool |
Whether to skip SSL certificate verification |
ca_cert_file_path |
str | None |
[optional] str Root Cert to be loaded and used to verify admin |
auth_type |
AuthType |
The authentication type to use (Pkce, ClientSecret, ExternalCommand, DeviceFlow) |
command |
List[str] | None |
This command is executed to return a token using an external process |
proxy_command |
List[str] | None |
This command is executed to return a token for proxy authorization using an external process |
api_key |
str | None |
Optional API key for authentication |
client_id |
str | None |
This is the public identifier for the app which handles authorization for a Flyte deployment. More details here: https://www.oauth.com/oauth2-servers/client-registration/client-id-secret/. |
client_credentials_secret |
str | None |
Used for service auth, which is automatically called during pyflyte. This will allow the Flyte engine to read the password directly from the environment variable. Note that this is less secure! Please only use this if mounting the secret as a file is impossible |
auth_client_config |
ClientConfig | None |
Optional client configuration for authentication |
rpc_retries |
int |
[optional] int Number of times to retry the platform calls |
http_proxy_url |
str | None |
[optional] HTTP Proxy to be used for OAuth requests |
storage |
Storage | None |
Optional blob store (S3, GCS, Azure) configuration if needed to access (i.e. using Minio) |
batch_size |
int |
Optional batch size for operations that use listings, defaults to 1000, so limit larger than batch_size will be split into multiple requests. |
image_builder |
ImageBuildEngine.ImageBuilderType |
Optional image builder configuration, if not provided, the default image builder will be used. |
images |
typing.Dict[str, str] | None |
Optional dict of images that can be used by referencing the image name. |
source_config_path |
Optional[Path] |
Optional path to the source configuration file (This is only used for documentation) |
sync_local_sys_paths |
bool |
Whether to include and synchronize local sys.path entries under the root directory into the remote container (default: True). |
load_plugin_type_transformers |
bool |
If enabled (default True), load the type transformer plugins registered under the “flyte.plugins.types” entry point group. :return: None |
init_from_config()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await init_from_config.aio().
def init_from_config(
path_or_config: str | Path | Config | None,
root_dir: Path | None,
log_level: int | None,
log_format: LogFormat,
storage: Storage | None,
images: tuple[str, ...] | None,
sync_local_sys_paths: bool,
)Initialize the Flyte system using a configuration file or Config object. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.
| Parameter | Type | Description |
|---|---|---|
path_or_config |
str | Path | Config | None |
Path to the configuration file or Config object |
root_dir |
Path | None |
Optional root directory from which to determine how to load files, and find paths to files like config etc. For example if one uses the copy-style==“all”, it is essential to determine the root directory for the current project. If not provided, it defaults to the editable install directory or if not available, the current working directory. |
log_level |
int | None |
Optional logging level for the framework logger, default is set using the default initialization policies |
log_format |
LogFormat |
Optional logging format for the logger, default is “console” |
storage |
Storage | None |
Optional blob store (S3, GCS, Azure) configuration if needed to access (i.e. using Minio) |
images |
tuple[str, ...] | None |
List of image strings in format “imagename=imageuri” or just “imageuri”. |
sync_local_sys_paths |
bool |
Whether to include and synchronize local sys.path entries under the root directory into the remote container (default: True). :return: None |
init_in_cluster()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await init_in_cluster.aio().
def init_in_cluster(
org: str | None,
project: str | None,
domain: str | None,
api_key: str | None,
endpoint: str | None,
insecure: bool,
) -> dict[str, typing.Any]| Parameter | Type | Description |
|---|---|---|
org |
str | None |
|
project |
str | None |
|
domain |
str | None |
|
api_key |
str | None |
|
endpoint |
str | None |
|
insecure |
bool |
map()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await flyte.map.aio().
def map(
func: typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]],
args: *args,
group_name: str | None,
concurrency: int,
return_exceptions: bool,
) -> typing.Iterator[typing.Union[~R, Exception]]Map a function over the provided arguments with concurrent execution.
| Parameter | Type | Description |
|---|---|---|
func |
typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]] |
The async function to map. |
args |
*args |
Positional arguments to pass to the function (iterables that will be zipped). |
group_name |
str | None |
The name of the group for the mapped tasks. |
concurrency |
int |
The maximum number of concurrent tasks to run. If 0, run all tasks concurrently. |
return_exceptions |
bool |
If True, yield exceptions instead of raising them. :return: AsyncIterator yielding results in order. |
run()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await run.aio().
def run(
task: TaskTemplate[P, R, F],
args: *args,
kwargs: **kwargs,
) -> RunRun a task with the given parameters
| Parameter | Type | Description |
|---|---|---|
task |
TaskTemplate[P, R, F] |
task to run |
args |
*args |
args to pass to the task |
kwargs |
**kwargs |
kwargs to pass to the task :return: Run | Result of the task |
serve()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await serve.aio().
def serve(
app_env: 'AppEnvironment',
) -> 'App'Serve a Flyte app using an AppEnvironment.
This is the simple, direct way to serve an app. For more control over deployment settings (env vars, cluster pool, etc.), use with_servecontext().
Example:
import flyte
from flyte.app.extras import FastAPIAppEnvironment
env = FastAPIAppEnvironment(name="my-app", ...)
# Simple serve
app = flyte.serve(env)
print(f"App URL: {app.url}")| Parameter | Type | Description |
|---|---|---|
app_env |
'AppEnvironment' |
The app environment to serve |
trace()
def trace(
func: typing.Callable[..., ~T],
) -> typing.Callable[..., ~T]A decorator that traces function execution with timing information. Works with regular functions, async functions, and async generators/iterators.
| Parameter | Type | Description |
|---|---|---|
func |
typing.Callable[..., ~T] |
version()
def version()Returns the version of the Flyte SDK.
with_runcontext()
def with_runcontext(
mode: Mode | None,
name: Optional[str],
service_account: Optional[str],
version: Optional[str],
copy_style: CopyFiles,
dry_run: bool,
copy_bundle_to: pathlib.Path | None,
interactive_mode: bool | None,
raw_data_path: str | None,
run_base_dir: str | None,
overwrite_cache: bool,
project: str | None,
domain: str | None,
env_vars: Dict[str, str] | None,
labels: Dict[str, str] | None,
annotations: Dict[str, str] | None,
interruptible: bool | None,
log_level: int | None,
log_format: LogFormat,
disable_run_cache: bool,
queue: Optional[str],
custom_context: Dict[str, str] | None,
cache_lookup_scope: CacheLookupScope,
) -> _RunnerLaunch a new run with the given parameters as the context.
Example:
import flyte
env = flyte.TaskEnvironment("example")
@env.task
async def example_task(x: int, y: str) -> str:
return f"{x} {y}"
if __name__ == "__main__":
flyte.with_runcontext(name="example_run_id").run(example_task, 1, y="hello")| Parameter | Type | Description |
|---|---|---|
mode |
Mode | None |
Optional The mode to use for the run, if not provided, it will be computed from flyte.init |
name |
Optional[str] |
Optional The name to use for the run |
service_account |
Optional[str] |
Optional The service account to use for the run context |
version |
Optional[str] |
Optional The version to use for the run, if not provided, it will be computed from the code bundle |
copy_style |
CopyFiles |
Optional The copy style to use for the run context |
dry_run |
bool |
Optional If true, the run will not be executed, but the bundle will be created |
copy_bundle_to |
pathlib.Path | None |
When dry_run is True, the bundle will be copied to this location if specified |
interactive_mode |
bool | None |
Optional, can be forced to True or False. If not provided, it will be set based on the current environment. For example Jupyter notebooks are considered interactive mode, while scripts are not. This is used to determine how the code bundle is created. |
raw_data_path |
str | None |
Use this path to store the raw data for the run for local and remote, and can be used to store raw data in specific locations. |
run_base_dir |
str | None |
Optional The base directory to use for the run. This is used to store the metadata for the run, that is passed between tasks. |
overwrite_cache |
bool |
Optional If true, the cache will be overwritten for the run |
project |
str | None |
Optional The project to use for the run |
domain |
str | None |
Optional The domain to use for the run |
env_vars |
Dict[str, str] | None |
Optional Environment variables to set for the run |
labels |
Dict[str, str] | None |
Optional Labels to set for the run |
annotations |
Dict[str, str] | None |
Optional Annotations to set for the run |
interruptible |
bool | None |
Optional If true, the run can be scheduled on interruptible instances and false implies that all tasks in the run should only be scheduled on non-interruptible instances. If not specified the original setting on all tasks is retained. |
log_level |
int | None |
Optional Log level to set for the run. If not provided, it will be set to the default log level set using flyte.init() |
log_format |
LogFormat |
Optional Log format to set for the run. If not provided, it will be set to the default log format |
disable_run_cache |
bool |
Optional If true, the run cache will be disabled. This is useful for testing purposes. |
queue |
Optional[str] |
Optional The queue to use for the run. This is used to specify the cluster to use for the run. |
custom_context |
Dict[str, str] | None |
Optional global input context to pass to the task. This will be available via get_custom_context() within the task and will automatically propagate to sub-tasks. Acts as base/default values that can be overridden by context managers in the code. |
cache_lookup_scope |
CacheLookupScope |
Optional Scope to use for the run. This is used to specify the scope to use for cache lookups. If not specified, it will be set to the default scope (global unless overridden at the system level). :return: runner |
with_servecontext()
def with_servecontext(
version: Optional[str],
copy_style: CopyFiles,
dry_run: bool,
project: str | None,
domain: str | None,
env_vars: dict[str, str] | None,
input_values: dict[str, dict[str, str | flyte.io.File | flyte.io.Dir]] | None,
cluster_pool: str | None,
log_level: int | None,
log_format: LogFormat,
) -> _ServeCreate a serve context with custom configuration.
This function allows you to customize how an app is served, including overriding environment variables, cluster pool, logging, and other deployment settings.
Example:
import logging
import flyte
from flyte.app.extras import FastAPIAppEnvironment
env = FastAPIAppEnvironment(name="my-app", ...)
# Serve with custom env vars, logging, and cluster pool
app = flyte.with_servecontext(
env_vars={"DATABASE_URL": "postgresql://..."},
log_level=logging.DEBUG,
log_format="json",
cluster_pool="gpu-pool",
project="my-project",
domain="development",
).serve(env)
print(f"App URL: {app.url}")| Parameter | Type | Description |
|---|---|---|
version |
Optional[str] |
Optional version override for the app deployment |
copy_style |
CopyFiles |
|
dry_run |
bool |
|
project |
str | None |
Optional project override |
domain |
str | None |
Optional domain override |
env_vars |
dict[str, str] | None |
Optional environment variables to inject/override in the app container |
input_values |
dict[str, dict[str, str | flyte.io.File | flyte.io.Dir]] | None |
Optional input values to inject/override in the app container. Must be a dictionary that maps app environment names to a dictionary of input names to values. |
cluster_pool |
str | None |
Optional cluster pool to deploy the app to |
log_level |
int | None |
Optional log level (e.g., logging.DEBUG, logging.INFO). If not provided, uses init config or default |
log_format |
LogFormat |