Metaflow provides comprehensive support for AWS, offering deep integration with AWS Batch for compute, S3 for storage, Step Functions for orchestration, and Secrets Manager for credential management.
Overview
AWS is Metaflow’s most mature cloud platform, originally developed at Netflix to run thousands of production workflows. The AWS integration includes:
AWS Batch : Elastic compute with support for CPUs, GPUs, and multi-node workloads
Amazon S3 : Scalable object storage for artifacts and data
AWS Step Functions : Production workflow orchestration
AWS Secrets Manager : Secure credential and secret management
Amazon ECR : Container registry for custom Docker images
Setup
Prerequisites
AWS account with appropriate permissions
AWS CLI configured with credentials
Metaflow installed: pip install metaflow
Configuration
Use the Metaflow configure command to set up AWS:
This interactive wizard will prompt you for:
S3 Bucket
Specify an S3 bucket for artifact storage: METAFLOW_DATASTORE_SYSROOT_S3: s3://my-metaflow-bucket/metaflow
Batch Configuration
Configure AWS Batch job queue and IAM roles: METAFLOW_BATCH_JOB_QUEUE: my-batch-queue
METAFLOW_ECS_S3_ACCESS_IAM_ROLE: arn:aws:iam::123456789:role/metaflow-role
Step Functions
Optional: Configure Step Functions for production deployments: METAFLOW_SFN_IAM_ROLE: arn:aws:iam::123456789:role/metaflow-sfn-role
METAFLOW_SFN_STATE_MACHINE_PREFIX: MyProject
Manual Configuration
Alternatively, set environment variables directly:
# Set S3 as default datastore
export METAFLOW_DEFAULT_DATASTORE = s3
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// my-bucket / metaflow
# Configure AWS Batch
export METAFLOW_BATCH_JOB_QUEUE = my-batch-queue
export METAFLOW_ECS_S3_ACCESS_IAM_ROLE = arn : aws : iam :: 123456789 : role / metaflow-role
# Optional: Configure container registry
export METAFLOW_BATCH_CONTAINER_REGISTRY = 123456789 . dkr . ecr . us-east-1 . amazonaws . com
export METAFLOW_BATCH_CONTAINER_IMAGE = my-image : latest
AWS Batch Integration
The @batch decorator runs steps on AWS Batch, providing elastic compute with automatic scaling.
Basic Usage
from metaflow import FlowSpec, step, batch
class MyFlow ( FlowSpec ):
@batch
@step
def compute_step ( self ):
# Runs on AWS Batch with default resources
self .result = expensive_computation()
self .next( self .end)
@step
def end ( self ):
print ( f "Result: { self .result } " )
Resource Configuration
Specify CPU, memory, and GPU requirements:
@batch ( cpu = 8 , memory = 32000 , gpu = 1 )
@step
def gpu_training ( self ):
# 8 CPUs, 32GB memory, 1 GPU
model = train_model()
self .model = model
self .next( self .end)
Parameters Reference
Number of CPUs required (1-96)
Memory in MB (128-245760)
Custom Docker image (e.g., my-image:latest)
AWS Batch job queue (defaults to METAFLOW_BATCH_JOB_QUEUE)
IAM role ARN for the container
Execution role ARN for Fargate
Advanced Features
Custom Docker Images
@batch ( image = "123456789.dkr.ecr.us-east-1.amazonaws.com/my-model:v1.0" )
@step
def custom_environment ( self ):
# Uses custom container with specific dependencies
import special_library
self .result = special_library.process()
self .next( self .end)
Inferentia and Trainium Support
@batch ( inferentia = 1 )
@step
def inference_step ( self ):
# Uses AWS Inferentia chip for ML inference
predictions = model.predict(data)
self .next( self .end)
@batch ( trainium = 2 )
@step
def training_step ( self ):
# Uses AWS Trainium chips for training
model = train_large_model()
self .next( self .end)
Shared Memory and Swap
@batch (
memory = 16000 ,
shared_memory = 8000 , # 8GB shared memory
max_swap = 4000 , # 4GB swap
swappiness = 60 # Swappiness value
)
@step
def memory_intensive ( self ):
# Configure memory settings for the container
large_array = process_big_data()
self .next( self .end)
Tmpfs Support
@batch (
use_tmpfs = True ,
tmpfs_size = 10000 , # 10GB tmpfs
tmpfs_path = "/tmp/fast" ,
tmpfs_tempdir = True # Set METAFLOW_TEMPDIR to tmpfs_path
)
@step
def fast_io ( self ):
# Uses tmpfs for fast I/O operations
with open ( "/tmp/fast/data.bin" , "wb" ) as f:
f.write(generate_data())
self .next( self .end)
@batch (
aws_batch_tags = {
"project" : "ml-training" ,
"team" : "data-science" ,
"cost-center" : "research"
}
)
@step
def tagged_step ( self ):
# Tags help with cost tracking and organization
self .result = train_model()
self .next( self .end)
Multi-Node Parallel Computing
from metaflow import FlowSpec, step, batch, parallel
class DistributedFlow ( FlowSpec ):
@batch ( cpu = 8 , memory = 16000 )
@parallel ( num_parallel = 4 )
@step
def distributed_training ( self ):
# Runs on 4 nodes with gang scheduling
from metaflow import current
rank = current.parallel.node_index
num_nodes = current.parallel.num_nodes
# Distributed training code
model = train_distributed(rank, num_nodes)
self .next( self .end)
@step
def end ( self ):
print ( "Training complete" )
S3 Storage
Metaflow automatically stores all artifacts in S3 when configured.
Configuration
# Basic S3 configuration
export METAFLOW_DATASTORE_SYSROOT_S3 = s3 :// my-bucket / metaflow
# Server-side encryption
export METAFLOW_S3_SERVER_SIDE_ENCRYPTION = AES256
# or for KMS:
export METAFLOW_S3_SERVER_SIDE_ENCRYPTION = aws : kms
# Custom endpoint (e.g., for MinIO)
export METAFLOW_S3_ENDPOINT_URL = https :// minio . example . com
# Performance tuning
export METAFLOW_S3_RETRY_COUNT = 10
export METAFLOW_S3_WORKER_COUNT = 64
Accessing Data
Metaflow provides the S3 datatools library for efficient data access:
from metaflow import FlowSpec, step, S3
class DataFlow ( FlowSpec ):
@step
def start ( self ):
# Upload data to S3
with S3( s3root = "s3://my-bucket/data" ) as s3:
s3.put_files([( "local_file.csv" , "remote_file.csv" )])
self .next( self .process)
@step
def process ( self ):
# Download data from S3
with S3( s3root = "s3://my-bucket/data" ) as s3:
files = s3.get_files([ "remote_file.csv" ])
with open (files[ 0 ].path, "r" ) as f:
data = f.read()
self .next( self .end)
@step
def end ( self ):
print ( "Processing complete" )
AWS Step Functions
Deploy production workflows to AWS Step Functions for reliable orchestration.
Deploying to Step Functions
# Deploy flow to Step Functions
python myflow.py step-functions create
# Trigger execution
python myflow.py step-functions trigger
# List executions
python myflow.py step-functions list-runs
Scheduling with EventBridge
Use the @schedule decorator for periodic execution:
from metaflow import FlowSpec, step, schedule
class ScheduledFlow ( FlowSpec ):
@schedule ( cron = "0 0 * * *" ) # Daily at midnight
@step
def start ( self ):
self .data = fetch_daily_data()
self .next( self .process)
@step
def process ( self ):
self .result = process( self .data)
self .next( self .end)
@step
def end ( self ):
print ( f "Result: { self .result } " )
Event Triggering
Trigger flows from S3 events:
from metaflow import FlowSpec, step, trigger
@trigger ( event = "s3" )
class EventFlow ( FlowSpec ):
@step
def start ( self ):
# Triggered when objects are uploaded to S3
from metaflow import current
bucket = current.trigger.s3.bucket
key = current.trigger.s3.key
self .process_file(bucket, key)
self .next( self .end)
@step
def end ( self ):
print ( "Processing complete" )
AWS Secrets Manager
Securely manage credentials and secrets using AWS Secrets Manager.
Configuration
# Set default region for Secrets Manager
export METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_REGION = us-east-1
# Optional: Default IAM role for secret access
export METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_ROLE = arn : aws : iam :: 123456789 : role / secrets-role
Using Secrets
from metaflow import FlowSpec, step, secrets
class SecureFlow ( FlowSpec ):
@secrets (
sources = [ "aws-secrets-manager" ],
secrets = [ "my-api-key" , "database-credentials" ]
)
@step
def start ( self ):
import os
# Secrets automatically injected as environment variables
api_key = os.environ[ "MY_API_KEY" ]
db_user = os.environ[ "DATABASE_CREDENTIALS_USERNAME" ]
db_pass = os.environ[ "DATABASE_CREDENTIALS_PASSWORD" ]
# Use credentials securely
self .data = fetch_data(api_key, db_user, db_pass)
self .next( self .end)
@step
def end ( self ):
print ( f "Fetched { len ( self .data) } records" )
JSON Secrets
# Secret stored as JSON in AWS Secrets Manager:
# {
# "username": "admin",
# "password": "secret123",
# "host": "db.example.com"
# }
@secrets ( sources = [ "aws-secrets-manager" ], secrets = [ "database-config" ])
@step
def connect ( self ):
import os
# Each JSON key becomes an environment variable
username = os.environ[ "DATABASE_CONFIG_USERNAME" ]
password = os.environ[ "DATABASE_CONFIG_PASSWORD" ]
host = os.environ[ "DATABASE_CONFIG_HOST" ]
Plain Text Secrets
# For non-JSON secrets, specify a custom env var name
@secrets (
sources = [ "aws-secrets-manager" ],
secrets = [( "api-token" , { "env_var_name" : "API_TOKEN" , "json" : False })]
)
@step
def api_call ( self ):
import os
token = os.environ[ "API_TOKEN" ]
response = call_api(token)
Binary Secrets
# Binary secrets are base64-encoded
@secrets ( sources = [ "aws-secrets-manager" ], secrets = [ "certificate-key" ])
@step
def use_cert ( self ):
import os
import base64
cert_bytes = base64.b64decode(os.environ[ "CERTIFICATE_KEY" ])
IAM Permissions
Required IAM permissions for Metaflow:
S3 Access
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [
"s3:GetObject" ,
"s3:PutObject" ,
"s3:DeleteObject" ,
"s3:ListBucket"
],
"Resource" : [
"arn:aws:s3:::my-metaflow-bucket/*" ,
"arn:aws:s3:::my-metaflow-bucket"
]
}
]
}
AWS Batch Access
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [
"batch:SubmitJob" ,
"batch:DescribeJobs" ,
"batch:TerminateJob" ,
"batch:ListJobs"
],
"Resource" : "*"
},
{
"Effect" : "Allow" ,
"Action" : [
"ecs:DescribeTasks" ,
"logs:GetLogEvents"
],
"Resource" : "*"
}
]
}
Step Functions Access
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [
"states:CreateStateMachine" ,
"states:UpdateStateMachine" ,
"states:StartExecution" ,
"states:DescribeExecution" ,
"states:ListExecutions"
],
"Resource" : "*"
},
{
"Effect" : "Allow" ,
"Action" : "iam:PassRole" ,
"Resource" : "arn:aws:iam::123456789:role/metaflow-sfn-role"
}
]
}
Secrets Manager Access
{
"Version" : "2012-10-17" ,
"Statement" : [
{
"Effect" : "Allow" ,
"Action" : [
"secretsmanager:GetSecretValue" ,
"secretsmanager:DescribeSecret"
],
"Resource" : "arn:aws:secretsmanager:us-east-1:123456789:secret:*"
}
]
}
Best Practices
Use spot instances for fault-tolerant workloads
Right-size Batch compute resources
Implement S3 lifecycle policies for old artifacts
Use S3 Intelligent-Tiering for data storage
Monitor costs with AWS Cost Explorer
Use IAM roles instead of access keys
Enable S3 bucket encryption (SSE-S3 or SSE-KMS)
Use AWS Secrets Manager for credentials
Enable VPC for Batch compute environments
Implement least-privilege IAM policies
Use Step Functions for production workflows
Configure appropriate retry policies
Monitor Step Functions execution metrics
Set up CloudWatch alarms for failures
Use DynamoDB for foreach coordination
Troubleshooting
Common Issues
Problem : Jobs stuck in RUNNABLE stateSolutions :
Check Batch compute environment capacity
Verify IAM role permissions
Review VPC and subnet configuration
Check service quotas and limits
Problem : Cannot read/write S3 objectsSolutions :
Verify IAM role has S3 permissions
Check S3 bucket policy
Ensure bucket and role are in same account
Verify bucket name is correct
Problem : Custom image not foundSolutions :
Verify ECR repository exists
Check image tag is correct
Ensure IAM role can pull from ECR
Use full image URI with registry
Next Steps
AWS Batch Deep Dive Learn advanced Batch features and optimization
Step Functions Guide Deploy production workflows to Step Functions
S3 Datatools Master efficient data access with S3
Secrets Management Secure credential management patterns