2.0.0b35

flyte

Flyte SDK for authoring compound AI applications, services and workflows.

Directory

Classes

Class Description
Cache Cache configuration for a task.
Cron This class defines a Cron automation that can be associated with a Trigger in Flyte.
Device Represents a device type, its quantity and partition if applicable.
Environment
FixedRate This class defines a FixedRate automation that can be associated with a Trigger in Flyte.
Image This is a representation of Container Images, which can be used to create layered images programmatically.
PodTemplate Custom PodTemplate specification for a Task.
Resources Resources such as CPU, Memory, and GPU that can be allocated to a task.
RetryStrategy Retry strategy for the task or task environment.
ReusePolicy ReusePolicy can be used to configure a task to reuse the environment.
Secret Secrets are used to inject sensitive information into tasks or image build context.
TaskEnvironment Environment class to define a new environment for a set of tasks.
Timeout Timeout class to define a timeout for a task.
Trigger This class defines specification of a Trigger, that can be associated with any Flyte V2 task.

Protocols

Protocol Description
CachePolicy Base class for protocol classes.

Methods

Method Description
AMD_GPU() Create an AMD GPU device instance.
GPU() Create a GPU device instance.
HABANA_GAUDI() Create a Habana Gaudi device instance.
Neuron() Create a Neuron device instance.
TPU() Create a TPU device instance.
build() Build an image.
build_images() Build the images for the given environments.
ctx() Returns flyte.
current_domain() Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration.
custom_context() Synchronous context manager to set input context for tasks spawned within this block.
deploy() Deploy the given environment or list of environments.
get_custom_context() Get the current input context.
group() Create a new group with the given name.
init() Initialize the Flyte system with the given configuration.
init_from_config() Initialize the Flyte system using a configuration file or Config object.
init_in_cluster()
map() Map a function over the provided arguments with concurrent execution.
run() Run a task with the given parameters.
serve() Serve a Flyte app using an AppEnvironment.
trace() A decorator that traces function execution with timing information.
version() Returns the version of the Flyte SDK.
with_runcontext() Launch a new run with the given parameters as the context.
with_servecontext() Create a serve context with custom configuration.

Variables

Property Type Description
TimeoutType UnionType
TriggerTime _trigger_time
__version__ str
logger Logger

Methods

AMD_GPU()

def AMD_GPU(
    device: typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X'],
) -> flyte._resources.Device

Create an AMD GPU device instance.

Parameter Type Description
device typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X'] Device type (e.g., “MI100”, “MI210”, “MI250”, “MI250X”, “MI300A”, “MI300X”, “MI325X”, “MI350X”, “MI355X”). :return: Device instance.

GPU()

def GPU(
    device: typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'H200', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000'],
    quantity: typing.Literal[1, 2, 3, 4, 5, 6, 7, 8],
    partition: typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], typing.Literal['1g.18gb', '1g.35gb', '2g.35gb', '3g.71gb', '4g.71gb', '7g.141gb'], NoneType],
) -> flyte._resources.Device

Create a GPU device instance.

Parameter Type Description
device typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'H200', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000'] The type of GPU (e.g., “T4”, “A100”).
quantity typing.Literal[1, 2, 3, 4, 5, 6, 7, 8] The number of GPUs of this type.
partition typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], typing.Literal['1g.18gb', '1g.35gb', '2g.35gb', '3g.71gb', '4g.71gb', '7g.141gb'], NoneType] The partition of the GPU (e.g., “1g.5gb”, “2g.10gb” for gpus) or (“1x1”, … for tpus). :return: Device instance.

HABANA_GAUDI()

def HABANA_GAUDI(
    device: typing.Literal['Gaudi1'],
) -> flyte._resources.Device

Create a Habana Gaudi device instance.

Parameter Type Description
device typing.Literal['Gaudi1'] Device type (e.g., “DL1”). :return: Device instance.

Neuron()

def Neuron(
    device: typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u'],
) -> flyte._resources.Device

Create a Neuron device instance.

Parameter Type Description
device typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u'] Device type (e.g., “Inf1”, “Inf2”, “Trn1”, “Trn1n”, “Trn2”, “Trn2u”).

TPU()

def TPU(
    device: typing.Literal['V5P', 'V6E'],
    partition: typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType],
)

Create a TPU device instance.

Parameter Type Description
device typing.Literal['V5P', 'V6E'] Device type (e.g., “V5P”, “V6E”).
partition typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType] Partition of the TPU (e.g., “1x1”, “2x2”, …). :return: Device instance.

build()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await build.aio().

def build(
    image: Image,
) -> str

Build an image. The existing async context will be used.

Example:

import flyte
image = flyte.Image("example_image")
if __name__ == "__main__":
    asyncio.run(flyte.build.aio(image))
Parameter Type Description
image Image The image(s) to build. :return: The image URI.

build_images()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await build_images.aio().

def build_images(
    envs: Environment,
) -> ImageCache

Build the images for the given environments.

Parameter Type Description
envs Environment Environment to build images for. :return: ImageCache containing the built images.

ctx()

def ctx()

Returns flyte.models.TaskContext if within a task context, else None Note: Only use this in task code and not module level.

current_domain()

def current_domain()

Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration. This is safe to be used during deploy, run and within task code.

NOTE: This will not work if you deploy a task to a domain and then run it in another domain.

Raises InitializationError if the configuration is not initialized or domain is not set. :return: The current domain

custom_context()

def custom_context(
    context: str,
)

Synchronous context manager to set input context for tasks spawned within this block.

Example:

import flyte

env = flyte.TaskEnvironment(name="...")

@env.task
def t1():
    ctx = flyte.get_custom_context()
    print(ctx)

@env.task
def main():
    # context can be passed via a context manager
    with flyte.custom_context(project="my-project"):
        t1()  # will have {'project': 'my-project'} as context
Parameter Type Description
context str Key-value pairs to set as input context

deploy()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await deploy.aio().

def deploy(
    envs: Environment,
    dryrun: bool,
    version: str | None,
    interactive_mode: bool | None,
    copy_style: CopyFiles,
) -> List[Deployment]

Deploy the given environment or list of environments.

Parameter Type Description
envs Environment Environment or list of environments to deploy.
dryrun bool dryrun mode, if True, the deployment will not be applied to the control plane.
version str | None version of the deployment, if None, the version will be computed from the code bundle. TODO: Support for interactive_mode
interactive_mode bool | None Optional, can be forced to True or False. If not provided, it will be set based on the current environment. For example Jupyter notebooks are considered interactive mode, while scripts are not. This is used to determine how the code bundle is created.
copy_style CopyFiles Copy style to use when running the task :return: Deployment object containing the deployed environments and tasks.

get_custom_context()

def get_custom_context()

Get the current input context. This can be used within a task to retrieve context metadata that was passed to the action.

Context will automatically propagate to sub-actions.

Example:

import flyte

env = flyte.TaskEnvironment(name="...")

@env.task
def t1():
    # context can be retrieved with `get_custom_context`
    ctx = flyte.get_custom_context()
    print(ctx)  # {'project': '...', 'entity': '...'}

:return: Dictionary of context key-value pairs

group()

def group(
    name: str,
)

Create a new group with the given name. The method is intended to be used as a context manager.

Example:

@task
async def my_task():
    ...
    with group("my_group"):
        t1(x,y)  # tasks in this block will be grouped under "my_group"
    ...
Parameter Type Description
name str The name of the group

init()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await init.aio().

def init(
    org: str | None,
    project: str | None,
    domain: str | None,
    root_dir: Path | None,
    log_level: int | None,
    log_format: LogFormat | None,
    endpoint: str | None,
    headless: bool,
    insecure: bool,
    insecure_skip_verify: bool,
    ca_cert_file_path: str | None,
    auth_type: AuthType,
    command: List[str] | None,
    proxy_command: List[str] | None,
    api_key: str | None,
    client_id: str | None,
    client_credentials_secret: str | None,
    auth_client_config: ClientConfig | None,
    rpc_retries: int,
    http_proxy_url: str | None,
    storage: Storage | None,
    batch_size: int,
    image_builder: ImageBuildEngine.ImageBuilderType,
    images: typing.Dict[str, str] | None,
    source_config_path: Optional[Path],
    sync_local_sys_paths: bool,
    load_plugin_type_transformers: bool,
)

Initialize the Flyte system with the given configuration. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.

Parameter Type Description
org str | None Optional organization override for the client. Should be set by auth instead.
project str | None Optional project name (not used in this implementation)
domain str | None Optional domain name (not used in this implementation)
root_dir Path | None Optional root directory from which to determine how to load files, and find paths to files. This is useful for determining the root directory for the current project, and for locating files like config etc. also use to determine all the code that needs to be copied to the remote location. defaults to the editable install directory if the cwd is in a Python editable install, else just the cwd.
log_level int | None Optional logging level for the logger, default is set using the default initialization policies
log_format LogFormat | None Optional logging format for the logger, default is “console”
endpoint str | None Optional API endpoint URL
headless bool Optional Whether to run in headless mode
insecure bool insecure flag for the client
insecure_skip_verify bool Whether to skip SSL certificate verification
ca_cert_file_path str | None [optional] str Root Cert to be loaded and used to verify admin
auth_type AuthType The authentication type to use (Pkce, ClientSecret, ExternalCommand, DeviceFlow)
command List[str] | None This command is executed to return a token using an external process
proxy_command List[str] | None This command is executed to return a token for proxy authorization using an external process
api_key str | None Optional API key for authentication
client_id str | None This is the public identifier for the app which handles authorization for a Flyte deployment. More details here: https://www.oauth.com/oauth2-servers/client-registration/client-id-secret/.
client_credentials_secret str | None Used for service auth, which is automatically called during pyflyte. This will allow the Flyte engine to read the password directly from the environment variable. Note that this is less secure! Please only use this if mounting the secret as a file is impossible
auth_client_config ClientConfig | None Optional client configuration for authentication
rpc_retries int [optional] int Number of times to retry the platform calls
http_proxy_url str | None [optional] HTTP Proxy to be used for OAuth requests
storage Storage | None Optional blob store (S3, GCS, Azure) configuration if needed to access (i.e. using Minio)
batch_size int Optional batch size for operations that use listings, defaults to 1000, so limit larger than batch_size will be split into multiple requests.
image_builder ImageBuildEngine.ImageBuilderType Optional image builder configuration, if not provided, the default image builder will be used.
images typing.Dict[str, str] | None Optional dict of images that can be used by referencing the image name.
source_config_path Optional[Path] Optional path to the source configuration file (This is only used for documentation)
sync_local_sys_paths bool Whether to include and synchronize local sys.path entries under the root directory into the remote container (default: True).
load_plugin_type_transformers bool If enabled (default True), load the type transformer plugins registered under the “flyte.plugins.types” entry point group. :return: None

init_from_config()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await init_from_config.aio().

def init_from_config(
    path_or_config: str | Path | Config | None,
    root_dir: Path | None,
    log_level: int | None,
    log_format: LogFormat,
    storage: Storage | None,
    images: tuple[str, ...] | None,
    sync_local_sys_paths: bool,
)

Initialize the Flyte system using a configuration file or Config object. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.

Parameter Type Description
path_or_config str | Path | Config | None Path to the configuration file or Config object
root_dir Path | None Optional root directory from which to determine how to load files, and find paths to files like config etc. For example if one uses the copy-style==“all”, it is essential to determine the root directory for the current project. If not provided, it defaults to the editable install directory or if not available, the current working directory.
log_level int | None Optional logging level for the framework logger, default is set using the default initialization policies
log_format LogFormat Optional logging format for the logger, default is “console”
storage Storage | None Optional blob store (S3, GCS, Azure) configuration if needed to access (i.e. using Minio)
images tuple[str, ...] | None List of image strings in format “imagename=imageuri” or just “imageuri”.
sync_local_sys_paths bool Whether to include and synchronize local sys.path entries under the root directory into the remote container (default: True). :return: None

init_in_cluster()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await init_in_cluster.aio().

def init_in_cluster(
    org: str | None,
    project: str | None,
    domain: str | None,
    api_key: str | None,
    endpoint: str | None,
    insecure: bool,
) -> dict[str, typing.Any]
Parameter Type Description
org str | None
project str | None
domain str | None
api_key str | None
endpoint str | None
insecure bool

map()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await flyte.map.aio().

def map(
    func: typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]],
    args: *args,
    group_name: str | None,
    concurrency: int,
    return_exceptions: bool,
) -> typing.Iterator[typing.Union[~R, Exception]]

Map a function over the provided arguments with concurrent execution.

Parameter Type Description
func typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]] The async function to map.
args *args Positional arguments to pass to the function (iterables that will be zipped).
group_name str | None The name of the group for the mapped tasks.
concurrency int The maximum number of concurrent tasks to run. If 0, run all tasks concurrently.
return_exceptions bool If True, yield exceptions instead of raising them. :return: AsyncIterator yielding results in order.

run()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await run.aio().

def run(
    task: TaskTemplate[P, R, F],
    args: *args,
    kwargs: **kwargs,
) -> Run

Run a task with the given parameters

Parameter Type Description
task TaskTemplate[P, R, F] task to run
args *args args to pass to the task
kwargs **kwargs kwargs to pass to the task :return: Run | Result of the task

serve()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await serve.aio().

def serve(
    app_env: 'AppEnvironment',
) -> 'App'

Serve a Flyte app using an AppEnvironment.

This is the simple, direct way to serve an app. For more control over deployment settings (env vars, cluster pool, etc.), use with_servecontext().

Example:

import flyte
from flyte.app.extras import FastAPIAppEnvironment

env = FastAPIAppEnvironment(name="my-app", ...)

# Simple serve
app = flyte.serve(env)
print(f"App URL: {app.url}")
Parameter Type Description
app_env 'AppEnvironment' The app environment to serve

trace()

def trace(
    func: typing.Callable[..., ~T],
) -> typing.Callable[..., ~T]

A decorator that traces function execution with timing information. Works with regular functions, async functions, and async generators/iterators.

Parameter Type Description
func typing.Callable[..., ~T]

version()

def version()

Returns the version of the Flyte SDK.

with_runcontext()

def with_runcontext(
    mode: Mode | None,
    name: Optional[str],
    service_account: Optional[str],
    version: Optional[str],
    copy_style: CopyFiles,
    dry_run: bool,
    copy_bundle_to: pathlib.Path | None,
    interactive_mode: bool | None,
    raw_data_path: str | None,
    run_base_dir: str | None,
    overwrite_cache: bool,
    project: str | None,
    domain: str | None,
    env_vars: Dict[str, str] | None,
    labels: Dict[str, str] | None,
    annotations: Dict[str, str] | None,
    interruptible: bool | None,
    log_level: int | None,
    log_format: LogFormat,
    disable_run_cache: bool,
    queue: Optional[str],
    custom_context: Dict[str, str] | None,
    cache_lookup_scope: CacheLookupScope,
) -> _Runner

Launch a new run with the given parameters as the context.

Example:

import flyte
env = flyte.TaskEnvironment("example")

@env.task
async def example_task(x: int, y: str) -> str:
    return f"{x} {y}"

if __name__ == "__main__":
    flyte.with_runcontext(name="example_run_id").run(example_task, 1, y="hello")
Parameter Type Description
mode Mode | None Optional The mode to use for the run, if not provided, it will be computed from flyte.init
name Optional[str] Optional The name to use for the run
service_account Optional[str] Optional The service account to use for the run context
version Optional[str] Optional The version to use for the run, if not provided, it will be computed from the code bundle
copy_style CopyFiles Optional The copy style to use for the run context
dry_run bool Optional If true, the run will not be executed, but the bundle will be created
copy_bundle_to pathlib.Path | None When dry_run is True, the bundle will be copied to this location if specified
interactive_mode bool | None Optional, can be forced to True or False. If not provided, it will be set based on the current environment. For example Jupyter notebooks are considered interactive mode, while scripts are not. This is used to determine how the code bundle is created.
raw_data_path str | None Use this path to store the raw data for the run for local and remote, and can be used to store raw data in specific locations.
run_base_dir str | None Optional The base directory to use for the run. This is used to store the metadata for the run, that is passed between tasks.
overwrite_cache bool Optional If true, the cache will be overwritten for the run
project str | None Optional The project to use for the run
domain str | None Optional The domain to use for the run
env_vars Dict[str, str] | None Optional Environment variables to set for the run
labels Dict[str, str] | None Optional Labels to set for the run
annotations Dict[str, str] | None Optional Annotations to set for the run
interruptible bool | None Optional If true, the run can be scheduled on interruptible instances and false implies that all tasks in the run should only be scheduled on non-interruptible instances. If not specified the original setting on all tasks is retained.
log_level int | None Optional Log level to set for the run. If not provided, it will be set to the default log level set using flyte.init()
log_format LogFormat Optional Log format to set for the run. If not provided, it will be set to the default log format
disable_run_cache bool Optional If true, the run cache will be disabled. This is useful for testing purposes.
queue Optional[str] Optional The queue to use for the run. This is used to specify the cluster to use for the run.
custom_context Dict[str, str] | None Optional global input context to pass to the task. This will be available via get_custom_context() within the task and will automatically propagate to sub-tasks. Acts as base/default values that can be overridden by context managers in the code.
cache_lookup_scope CacheLookupScope Optional Scope to use for the run. This is used to specify the scope to use for cache lookups. If not specified, it will be set to the default scope (global unless overridden at the system level). :return: runner

with_servecontext()

def with_servecontext(
    version: Optional[str],
    copy_style: CopyFiles,
    dry_run: bool,
    project: str | None,
    domain: str | None,
    env_vars: dict[str, str] | None,
    input_values: dict[str, dict[str, str | flyte.io.File | flyte.io.Dir]] | None,
    cluster_pool: str | None,
    log_level: int | None,
    log_format: LogFormat,
) -> _Serve

Create a serve context with custom configuration.

This function allows you to customize how an app is served, including overriding environment variables, cluster pool, logging, and other deployment settings.

Example:

import logging
import flyte
from flyte.app.extras import FastAPIAppEnvironment

env = FastAPIAppEnvironment(name="my-app", ...)

# Serve with custom env vars, logging, and cluster pool
app = flyte.with_servecontext(
    env_vars={"DATABASE_URL": "postgresql://..."},
    log_level=logging.DEBUG,
    log_format="json",
    cluster_pool="gpu-pool",
    project="my-project",
    domain="development",
).serve(env)

print(f"App URL: {app.url}")
Parameter Type Description
version Optional[str] Optional version override for the app deployment
copy_style CopyFiles
dry_run bool
project str | None Optional project override
domain str | None Optional domain override
env_vars dict[str, str] | None Optional environment variables to inject/override in the app container
input_values dict[str, dict[str, str | flyte.io.File | flyte.io.Dir]] | None Optional input values to inject/override in the app container. Must be a dictionary that maps app environment names to a dictionary of input names to values.
cluster_pool str | None Optional cluster pool to deploy the app to
log_level int | None Optional log level (e.g., logging.DEBUG, logging.INFO). If not provided, uses init config or default
log_format LogFormat