Skip to content

Type-safe, distributed orchestration of agents, ML pipelines, and more — in pure Python with async/await.

License

Notifications You must be signed in to change notification settings

flyteorg/flyte-sdk

Repository files navigation

Flyte 2 SDK 🚀

Type-safe, distributed orchestration of agents, ML pipelines, and more — in pure Python with async/await or sync!

Version Python License

Pure Python workflows • 🔄 Async-first parallelism • 🛠️ Zero DSL constraints • 📊 Sub-task observability

🌍 Ecosystem & Resources

What is Flyte 2?

Flyte 2 represents a fundamental shift from constrained domain-specific languages to pure Python workflows. Write data pipelines, ML training jobs, and distributed compute exactly like you write Python—because it is Python.

import flyte

env = flyte.TaskEnvironment("hello_world")

@env.task
async def process_data(data: list[str]) -> list[str]:
    # Use any Python construct: loops, conditionals, try/except
    results = []
    for item in data:
        if len(item) > 5:
            results.append(await transform_item(item))
    return results

@env.task
async def transform_item(item: str) -> str:
    return f"processed: {item.upper()}"

if __name__ == "__main__":
    flyte.init()
    result = flyte.run(process_data, data=["hello", "world", "flyte"])

🌟 Why Flyte 2?

Feature Highlight Flyte 1 Flyte 2
No More Workflow DSL @workflow decorators with Python subset limitations Pure Python: loops, conditionals, error handling, dynamic structures
Async-First Parallelism ❌ Custom map() functions and workflow-specific parallel constructs Native asyncio: await asyncio.gather() for distributed parallel execution
Fine-Grained Observability ❌ Task-level logging only Function-level tracing with @flyte.trace for sub-task checkpoints

🚀 Quick Start

Installation

# Install uv package manager
curl -LsSf https://astral.sh/uv/install.sh | sh

# Create virtual environment
uv venv && source .venv/bin/activate

# Install Flyte 2 (beta)
uv pip install --prerelease=allow flyte

Your First Workflow

# hello.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["flyte>=2.0.0b0"]
# ///

import flyte

env = flyte.TaskEnvironment(
    name="hello_world",
    resources=flyte.Resources(memory="250Mi")
)

@env.task
def calculate(x: int) -> int:
    return x * 2 + 5

@env.task
async def main(numbers: list[int]) -> float:
    # Parallel execution across distributed containers
    results = await asyncio.gather(*[
        calculate.aio(num) for num in numbers
    ])
    return sum(results) / len(results)

if __name__ == "__main__":
    flyte.init_from_config("config.yaml")
    run = flyte.run(main, numbers=list(range(10)))
    print(f"Result: {run.result}")
    print(f"View at: {run.url}")
# Run locally, execute remotely
uv run --prerelease=allow hello.py

🏗️ Core Concepts

TaskEnvironments: Container Configuration Made Simple

# Group tasks with shared configuration
env = flyte.TaskEnvironment(
    name="ml_pipeline",
    image=flyte.Image.from_debian_base().with_pip_packages(
        "torch", "pandas", "scikit-learn"
    ),
    resources=flyte.Resources(cpu=4, memory="8Gi", gpu=1),
    reusable=flyte.ReusePolicy(replicas=3, idle_ttl=300)
)

@env.task
def train_model(data: flyte.File) -> flyte.File:
    # Runs in configured container with GPU access
    pass

@env.task
def evaluate_model(model: flyte.File, test_data: flyte.File) -> dict:
    # Same container configuration, different instance
    pass

Pure Python Workflows: No More DSL Constraints

@env.task
async def dynamic_pipeline(config: dict) -> list[str]:
    results = []

    # ✅ Use any Python construct
    for dataset in config["datasets"]:
        try:
            # ✅ Native error handling
            if dataset["type"] == "batch":
                result = await process_batch(dataset)
            else:
                result = await process_stream(dataset)
            results.append(result)
        except ValidationError as e:
            # ✅ Custom error recovery
            result = await handle_error(dataset, e)
            results.append(result)

    return results

Async Parallelism: Distributed by Default

@env.task
async def parallel_training(hyperparams: list[dict]) -> dict:
    # Each model trains on separate infrastructure
    models = await asyncio.gather(*[
        train_model.aio(params) for params in hyperparams
    ])

    # Evaluate all models in parallel
    evaluations = await asyncio.gather(*[
        evaluate_model.aio(model) for model in models
    ])

    # Find best model
    best_idx = max(range(len(evaluations)),
                   key=lambda i: evaluations[i]["accuracy"])
    return {"best_model": models[best_idx], "accuracy": evaluations[best_idx]}

🎯 Advanced Features

Sub-Task Observability with Tracing

@flyte.trace
async def expensive_computation(data: str) -> str:
    # Function-level checkpointing - recoverable on failure
    result = await call_external_api(data)
    return process_result(result)

@env.task(cache=flyte.Cache(behavior="auto"))
async def main_task(inputs: list[str]) -> list[str]:
    results = []
    for inp in inputs:
        # If task fails here, it resumes from the last successful trace
        result = await expensive_computation(inp)
        results.append(result)
    return results

Remote Task Execution

import flyte.remote

# Reference tasks deployed elsewhere
torch_task = flyte.remote.Task.get("torch_env.train_model", auto_version="latest")
spark_task = flyte.remote.Task.get("spark_env.process_data", auto_version="latest")

@env.task
async def orchestrator(raw_data: flyte.File) -> flyte.File:
    # Execute Spark job on big data cluster
    processed = await spark_task(raw_data)

    # Execute PyTorch training on GPU cluster
    model = await torch_task(processed)

    return model

📊 Native Jupyter Integration

Run and monitor workflows directly from notebooks:

# In Jupyter cell
import flyte

flyte.init_from_config()
run = flyte.run(my_workflow, data=large_dataset)

# Stream logs in real-time
run.logs.stream()

# Get outputs when complete
results = run.wait()

🔧 Configuration & Deployment

Configuration File

# config.yaml
endpoint: https://my-flyte-instance.com
project: ml-team
domain: production
image:
  builder: local
  registry: ghcr.io/my-org
auth:
  type: oauth2

Deploy and Run

# Deploy tasks to remote cluster
flyte deploy my_workflow.py

# Run deployed workflow
flyte run my_workflow --input-file params.json

# Monitor execution
flyte logs <execution-id>

Migration from Flyte 1

Flyte 1 Flyte 2
@workflow + @task @env.task only
flytekit.map() await asyncio.gather()
@dynamic workflows Regular @env.task with loops
flytekit.conditional() Python if/else
LaunchPlan schedules @env.task(on_schedule=...)
Workflow failure handlers Python try/except

🤝 Contributing

We welcome contributions! Whether it's:

  • 🐛 Bug fixes
  • New features
  • 📚 Documentation improvements
  • 🧪 Testing enhancements

Setup & Iteration Cycle

To get started, make sure you start from a new virtual environment and install this package in editable mode with any of the supported Python versions, from 3.10 to 3.13.

uv venv --python 3.13
uv pip install -e .

Besides from picking up local code changes, installing the package in editable mode also changes the definition of the default Image() object to use a locally build wheel. You will need to build said wheel by yourself though, with the make dist target.

make dist
python maint_tools/build_default_image.py

You'll need to have a local docker daemon running for this. The build script does nothing more than invoke the local image builder, which will create a buildx builder named flytex if not present. Note that only members of the Flyte Maintainers group has access to push to the default registry. If you don't have access, please make sure to specify the registry and name to the build script.

python maint_tools/build_default_image.py --registry ghcr.io/my-org --name my-flyte-image

📄 License

Flyte 2 is licensed under the Apache 2.0 License.

About

Type-safe, distributed orchestration of agents, ML pipelines, and more — in pure Python with async/await.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 17

Languages