Overview
Metaflow provides native support for Microsoft Azure, enabling you to store artifacts and data in Azure Blob Storage and execute workflows using Azure infrastructure. This integration allows you to build and deploy ML/AI workflows entirely within the Azure ecosystem.
Azure integration requires Python 3.6 or later and the Azure SDK packages.
Prerequisites
Before using Metaflow with Azure, ensure you have:
- Python 3.6 or later
- Azure subscription and storage account
- Azure SDK for Python packages
- Appropriate Azure credentials configured
Installation
Install Metaflow with Azure support:
pip install metaflow
# Install Azure SDK dependencies
pip install azure-storage-blob azure-identity
Azure Blob Storage
Metaflow uses Azure Blob Storage as a datastore backend for artifacts, data, and metadata.
Configuration
Configure Metaflow to use Azure Blob Storage:
# Set Azure Blob Storage as the datastore
export METAFLOW_DATASTORE_SYSROOT_AZURE=my-container/metaflow
# Set the storage account endpoint
export METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT=https://mystorageaccount.blob.core.windows.net
# Configure card storage (for Metaflow Cards)
export METAFLOW_CARD_AZUREROOT=my-container/cards
The Azure datastore path follows this format:
<container-name>/<blob-prefix>
Important path requirements:
- Must not start or end with a slash
- Must not contain consecutive slashes (//)
- Container name must not be empty
- Blob prefix is optional (can specify just container name)
Valid examples:
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod
Invalid examples:
# Don't use these:
export METAFLOW_DATASTORE_SYSROOT_AZURE=/metaflow-prod/artifacts # starts with /
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts/ # ends with /
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow//artifacts # consecutive slashes
Authentication
Metaflow supports Azure authentication via Azure Identity’s DefaultAzureCredential, which attempts authentication through multiple methods in order:
- Environment variables - Service principal credentials
- Managed Identity - For Azure VMs and services
- Azure CLI - If you’re logged in via
az login
- Visual Studio Code - If signed in to Azure
- Azure PowerShell - If authenticated
Environment Variable Authentication
Set service principal credentials:
export AZURE_CLIENT_ID=<your-client-id>
export AZURE_CLIENT_SECRET=<your-client-secret>
export AZURE_TENANT_ID=<your-tenant-id>
Managed Identity Authentication
When running on Azure (VMs, AKS, etc.), use managed identity:
# System-assigned managed identity (no config needed)
# OR specify user-assigned managed identity
export AZURE_CLIENT_ID=<managed-identity-client-id>
Azure CLI Authentication
For local development:
az login
az account set --subscription <subscription-id>
The Azure CLI authentication method may be slower (500-1000ms per token generation) compared to other methods. For production workloads, prefer managed identity or service principal authentication.
Using Azure Blob Storage in Flows
Basic Usage
Once configured, Metaflow automatically uses Azure Blob Storage:
from metaflow import FlowSpec, step
class AzureFlow(FlowSpec):
@step
def start(self):
# Artifacts are automatically stored in Azure Blob Storage
self.large_data = list(range(1000000))
self.next(self.process)
@step
def process(self):
# Data is automatically loaded from Azure Blob Storage
self.result = sum(self.large_data)
self.next(self.end)
@step
def end(self):
print(f"Result: {self.result}")
if __name__ == '__main__':
AzureFlow()
Accessing Azure Blob Storage Directly
You can also access Azure Blob Storage directly within your flows:
from metaflow import FlowSpec, step
from azure.storage.blob import BlobServiceClient
from metaflow.plugins.azure import create_azure_credential
import os
class AzureBlobFlow(FlowSpec):
@step
def start(self):
# Create authenticated blob service client
endpoint = os.environ['METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT']
credential = create_azure_credential()
blob_service = BlobServiceClient(
account_url=endpoint,
credential=credential
)
# Upload data
container = blob_service.get_container_client('my-container')
blob = container.get_blob_client('my-data.txt')
blob.upload_blob(b'Hello from Metaflow!')
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
AzureBlobFlow()
Azure Secrets Management
Metaflow integrates with Azure Key Vault for secrets management.
Configuration
# Set Azure Key Vault as the secrets backend
export METAFLOW_DEFAULT_SECRETS_BACKEND_TYPE=azure-key-vault
# Set Key Vault prefix (vault URL)
export METAFLOW_AZURE_KEY_VAULT_PREFIX=https://myvault.vault.azure.net
Using Secrets in Flows
from metaflow import FlowSpec, step, secrets
class AzureSecretsFlow(FlowSpec):
@secrets(sources=['azure-key-vault'])
@step
def start(self):
from metaflow import current
# Access secrets from Azure Key Vault
api_key = current.secrets.get('api-key')
db_password = current.secrets.get('database-password')
# Use secrets in your code
self.connection = connect_to_api(api_key)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
AzureSecretsFlow()
Secret Naming
Secrets are referenced by their Key Vault secret name:
- Secret names can contain alphanumeric characters and hyphens
- Use
current.secrets.get('secret-name') to retrieve values
- Secrets are cached during task execution for performance
Deploying to Azure
Azure Kubernetes Service (AKS)
Deploy Metaflow workflows to AKS using the Kubernetes integration:
from metaflow import FlowSpec, step, kubernetes
class AzureAKSFlow(FlowSpec):
@kubernetes(
cpu=4,
memory=8192,
image='myregistry.azurecr.io/my-image:latest',
namespace='metaflow',
service_account='metaflow-sa'
)
@step
def start(self):
# Executes on AKS
import platform
print(f"Running on: {platform.node()}")
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
AzureAKSFlow()
Deploy to Argo Workflows on AKS:
python azure_aks_flow.py argo-workflows create
Azure Container Registry (ACR)
Use container images from Azure Container Registry:
# Configure AKS to pull from ACR
az aks update -n myAKSCluster -g myResourceGroup --attach-acr myACRegistry
Then reference ACR images in your flows:
@kubernetes(image='myregistry.azurecr.io/ml-training:v1.0')
@step
def train(self):
pass
Multi-Cloud Workflows
Metaflow supports hybrid workflows that span multiple clouds:
from metaflow import FlowSpec, step, kubernetes
import os
class MultiCloudFlow(FlowSpec):
@step
def start(self):
# Decide storage based on configuration
if 'METAFLOW_DATASTORE_SYSROOT_AZURE' in os.environ:
self.storage_type = 'Azure'
elif 'METAFLOW_DATASTORE_SYSROOT_S3' in os.environ:
self.storage_type = 'S3'
else:
self.storage_type = 'GCS'
self.next(self.process)
@kubernetes(cpu=4, memory=8192)
@step
def process(self):
print(f"Using {self.storage_type} storage")
self.data = list(range(1000000))
self.next(self.end)
@step
def end(self):
print("Workflow complete")
if __name__ == '__main__':
MultiCloudFlow()
Token Caching
Metaflow caches Azure authentication tokens to minimize token generation overhead:
- Tokens are generated once and reused until expiration
- Default token expiration is several hours (configurable by Azure admin)
- Automatic token refresh when expiration is detected
- Particularly important for reducing Azure CLI auth latency
Parallel Operations
When performing many Azure operations, use parallel execution:
from metaflow import FlowSpec, step
from concurrent.futures import ThreadPoolExecutor
class ParallelAzureFlow(FlowSpec):
@step
def start(self):
self.files = [f'file{i}.dat' for i in range(100)]
self.next(self.upload)
@step
def upload(self):
# Parallel uploads to Azure Blob Storage
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(self.upload_file, self.files)
self.uploaded_count = sum(results)
self.next(self.end)
def upload_file(self, filename):
# Upload file to Azure
return 1
@step
def end(self):
print(f"Uploaded {self.uploaded_count} files")
if __name__ == '__main__':
ParallelAzureFlow()
Error Handling
Metaflow translates Azure SDK errors to Metaflow exceptions:
from metaflow import FlowSpec, step
from metaflow.exception import MetaflowException
class ErrorHandlingFlow(FlowSpec):
@step
def start(self):
try:
# Azure operations
self.data = self.load_from_azure()
except MetaflowException as e:
print(f"Azure error: {e}")
# Fallback behavior
self.data = self.load_from_backup()
self.next(self.end)
def load_from_azure(self):
# Load data from Azure Blob Storage
pass
def load_from_backup(self):
# Fallback data source
return []
@step
def end(self):
pass
if __name__ == '__main__':
ErrorHandlingFlow()
Common Azure exceptions:
- AuthenticationError: Invalid or expired credentials
- ResourceNotFoundError: Container or blob doesn’t exist
- ResourceExistsError: Attempting to create existing resource
Monitoring and Logging
Reducing Azure SDK Logging
By default, Metaflow reduces verbose Azure SDK logging:
import logging
# Azure Identity logging is set to ERROR level
# To enable debug logging for troubleshooting:
logging.getLogger('azure.identity').setLevel(logging.DEBUG)
logging.getLogger('azure.storage.blob').setLevel(logging.DEBUG)
Viewing Storage Usage
Monitor Azure Blob Storage usage:
# Using Azure CLI
az storage blob list \
--account-name mystorageaccount \
--container-name metaflow-prod \
--prefix artifacts/ \
--query "[].{Name:name, Size:properties.contentLength}"
Best Practices
-
Use Managed Identity in Production: When running on Azure infrastructure (VMs, AKS, Azure Functions), use managed identity for authentication instead of service principals.
-
Organize by Container: Use separate containers for different environments:
# Development
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-dev/artifacts
# Production
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts
-
Enable Soft Delete: Configure soft delete on your storage account to recover accidentally deleted artifacts:
az storage blob service-properties delete-policy update \
--account-name mystorageaccount \
--enable true \
--days-retained 30
-
Use Azure Key Vault: Store sensitive credentials in Azure Key Vault rather than environment variables.
-
Lifecycle Management: Implement lifecycle policies to automatically tier or delete old artifacts:
az storage account management-policy create \
--account-name mystorageaccount \
--policy @lifecycle-policy.json
-
Network Security: Use Azure Private Link to secure connections between compute and storage:
az network private-endpoint create \
--name metaflow-storage-endpoint \
--resource-group myResourceGroup \
--vnet-name myVNet \
--subnet mySubnet \
--private-connection-resource-id $STORAGE_ID \
--group-id blob \
--connection-name metaflow-storage-connection
Troubleshooting
Authentication Issues
Problem: ClientAuthenticationError when accessing storage
Solution: Verify your credentials are configured correctly:
# Test Azure CLI authentication
az account show
# Test managed identity (on Azure VM/AKS)
curl -H Metadata:true "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://storage.azure.com/"
# Verify service principal
az login --service-principal -u $AZURE_CLIENT_ID -p $AZURE_CLIENT_SECRET --tenant $AZURE_TENANT_ID
Storage Access Issues
Problem: ResourceNotFoundError when accessing blobs
Solution: Verify container exists and permissions are correct:
# Check container exists
az storage container exists \
--account-name mystorageaccount \
--name metaflow-prod
# Grant Storage Blob Data Contributor role
az role assignment create \
--role "Storage Blob Data Contributor" \
--assignee $PRINCIPAL_ID \
--scope /subscriptions/$SUB_ID/resourceGroups/$RG/providers/Microsoft.Storage/storageAccounts/$STORAGE_ACCOUNT
Problem: Slow artifact loading/saving
Solution:
- Ensure you’re in the same Azure region as your storage account
- Use managed identity instead of Azure CLI authentication
- Enable parallel uploads for large files
- Consider using Azure Premium Storage for high-throughput workloads
Example: Complete Azure Production Flow
from metaflow import FlowSpec, step, Parameter, schedule
from metaflow import kubernetes, retry, timeout, secrets
import os
class AzureProductionFlow(FlowSpec):
"""
Production ML workflow running entirely on Azure.
- Data stored in Azure Blob Storage
- Secrets managed in Azure Key Vault
- Compute on Azure Kubernetes Service
"""
model_version = Parameter('model_version',
default='v1.0',
help='Model version to deploy')
@step
def start(self):
"""Initialize workflow"""
from datetime import datetime
self.run_date = datetime.now().strftime('%Y-%m-%d')
# Verify Azure configuration
assert 'METAFLOW_DATASTORE_SYSROOT_AZURE' in os.environ
assert 'METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT' in os.environ
print(f"Starting Azure workflow: {self.run_date}")
self.next(self.load_data)
@kubernetes(cpu=4, memory=16384,
image='myregistry.azurecr.io/ml-base:latest')
@retry(times=3, minutes_between_retries=5)
@timeout(seconds=3600)
@secrets(sources=['azure-key-vault'])
@step
def load_data(self):
"""Load training data from Azure Blob Storage"""
from azure.storage.blob import BlobServiceClient
from metaflow import current
from metaflow.plugins.azure import create_azure_credential
# Get storage credentials from Key Vault
endpoint = os.environ['METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT']
credential = create_azure_credential()
# Load data
blob_service = BlobServiceClient(
account_url=endpoint,
credential=credential
)
container = blob_service.get_container_client('training-data')
blob = container.get_blob_client(f'data/{self.run_date}/features.parquet')
self.data_size = blob.get_blob_properties().size
print(f"Loaded {self.data_size} bytes of training data")
self.next(self.train)
@kubernetes(cpu=16, memory=65536, gpu=2, gpu_vendor='nvidia',
image='myregistry.azurecr.io/ml-training:latest')
@retry(times=2)
@timeout(seconds=7200)
@step
def train(self):
"""Train model on AKS with GPU"""
import torch
print(f"Training {self.model_version} on GPU: {torch.cuda.is_available()}")
# Training code here
self.model_accuracy = 0.95
self.model_path = f'models/{self.run_date}/{self.model_version}.pt'
self.next(self.deploy)
@kubernetes(cpu=4, memory=8192)
@secrets(sources=['azure-key-vault'])
@step
def deploy(self):
"""Deploy model to Azure ML endpoint"""
from metaflow import current
# Get deployment credentials from Key Vault
api_key = current.secrets.get('ml-endpoint-key')
# Deploy model
print(f"Deploying {self.model_path} with accuracy {self.model_accuracy}")
self.endpoint_url = f"https://myendpoint.azureml.net/v1/{self.model_version}"
self.next(self.end)
@step
def end(self):
"""Workflow complete"""
print(f"Deployment complete: {self.endpoint_url}")
print(f"Model version: {self.model_version}")
print(f"Accuracy: {self.model_accuracy}")
if __name__ == '__main__':
AzureProductionFlow()
Run locally:
python azure_production_flow.py run
Deploy to Argo Workflows on AKS:
python azure_production_flow.py argo-workflows create
Additional Resources