Skip to main content

Overview

Metaflow provides native support for Microsoft Azure, enabling you to store artifacts and data in Azure Blob Storage and execute workflows using Azure infrastructure. This integration allows you to build and deploy ML/AI workflows entirely within the Azure ecosystem.
Azure integration requires Python 3.6 or later and the Azure SDK packages.

Prerequisites

Before using Metaflow with Azure, ensure you have:
  • Python 3.6 or later
  • Azure subscription and storage account
  • Azure SDK for Python packages
  • Appropriate Azure credentials configured

Installation

Install Metaflow with Azure support:
pip install metaflow

# Install Azure SDK dependencies
pip install azure-storage-blob azure-identity

Azure Blob Storage

Metaflow uses Azure Blob Storage as a datastore backend for artifacts, data, and metadata.

Configuration

Configure Metaflow to use Azure Blob Storage:
# Set Azure Blob Storage as the datastore
export METAFLOW_DATASTORE_SYSROOT_AZURE=my-container/metaflow

# Set the storage account endpoint
export METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT=https://mystorageaccount.blob.core.windows.net

# Configure card storage (for Metaflow Cards)
export METAFLOW_CARD_AZUREROOT=my-container/cards

Container and Blob Path Format

The Azure datastore path follows this format:
<container-name>/<blob-prefix>
Important path requirements:
  • Must not start or end with a slash
  • Must not contain consecutive slashes (//)
  • Container name must not be empty
  • Blob prefix is optional (can specify just container name)
Valid examples:
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod
Invalid examples:
# Don't use these:
export METAFLOW_DATASTORE_SYSROOT_AZURE=/metaflow-prod/artifacts  # starts with /
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts/  # ends with /
export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow//artifacts       # consecutive slashes

Authentication

Metaflow supports Azure authentication via Azure Identity’s DefaultAzureCredential, which attempts authentication through multiple methods in order:
  1. Environment variables - Service principal credentials
  2. Managed Identity - For Azure VMs and services
  3. Azure CLI - If you’re logged in via az login
  4. Visual Studio Code - If signed in to Azure
  5. Azure PowerShell - If authenticated

Environment Variable Authentication

Set service principal credentials:
export AZURE_CLIENT_ID=<your-client-id>
export AZURE_CLIENT_SECRET=<your-client-secret>
export AZURE_TENANT_ID=<your-tenant-id>

Managed Identity Authentication

When running on Azure (VMs, AKS, etc.), use managed identity:
# System-assigned managed identity (no config needed)
# OR specify user-assigned managed identity
export AZURE_CLIENT_ID=<managed-identity-client-id>

Azure CLI Authentication

For local development:
az login
az account set --subscription <subscription-id>
The Azure CLI authentication method may be slower (500-1000ms per token generation) compared to other methods. For production workloads, prefer managed identity or service principal authentication.

Using Azure Blob Storage in Flows

Basic Usage

Once configured, Metaflow automatically uses Azure Blob Storage:
from metaflow import FlowSpec, step

class AzureFlow(FlowSpec):
    
    @step
    def start(self):
        # Artifacts are automatically stored in Azure Blob Storage
        self.large_data = list(range(1000000))
        self.next(self.process)
    
    @step
    def process(self):
        # Data is automatically loaded from Azure Blob Storage
        self.result = sum(self.large_data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")

if __name__ == '__main__':
    AzureFlow()

Accessing Azure Blob Storage Directly

You can also access Azure Blob Storage directly within your flows:
from metaflow import FlowSpec, step
from azure.storage.blob import BlobServiceClient
from metaflow.plugins.azure import create_azure_credential
import os

class AzureBlobFlow(FlowSpec):
    
    @step
    def start(self):
        # Create authenticated blob service client
        endpoint = os.environ['METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT']
        credential = create_azure_credential()
        
        blob_service = BlobServiceClient(
            account_url=endpoint,
            credential=credential
        )
        
        # Upload data
        container = blob_service.get_container_client('my-container')
        blob = container.get_blob_client('my-data.txt')
        blob.upload_blob(b'Hello from Metaflow!')
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    AzureBlobFlow()

Azure Secrets Management

Metaflow integrates with Azure Key Vault for secrets management.

Configuration

# Set Azure Key Vault as the secrets backend
export METAFLOW_DEFAULT_SECRETS_BACKEND_TYPE=azure-key-vault

# Set Key Vault prefix (vault URL)
export METAFLOW_AZURE_KEY_VAULT_PREFIX=https://myvault.vault.azure.net

Using Secrets in Flows

from metaflow import FlowSpec, step, secrets

class AzureSecretsFlow(FlowSpec):
    
    @secrets(sources=['azure-key-vault'])
    @step
    def start(self):
        from metaflow import current
        
        # Access secrets from Azure Key Vault
        api_key = current.secrets.get('api-key')
        db_password = current.secrets.get('database-password')
        
        # Use secrets in your code
        self.connection = connect_to_api(api_key)
        self.next(self.end)
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    AzureSecretsFlow()

Secret Naming

Secrets are referenced by their Key Vault secret name:
  • Secret names can contain alphanumeric characters and hyphens
  • Use current.secrets.get('secret-name') to retrieve values
  • Secrets are cached during task execution for performance

Deploying to Azure

Azure Kubernetes Service (AKS)

Deploy Metaflow workflows to AKS using the Kubernetes integration:
from metaflow import FlowSpec, step, kubernetes

class AzureAKSFlow(FlowSpec):
    
    @kubernetes(
        cpu=4,
        memory=8192,
        image='myregistry.azurecr.io/my-image:latest',
        namespace='metaflow',
        service_account='metaflow-sa'
    )
    @step
    def start(self):
        # Executes on AKS
        import platform
        print(f"Running on: {platform.node()}")
        self.next(self.end)
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    AzureAKSFlow()
Deploy to Argo Workflows on AKS:
python azure_aks_flow.py argo-workflows create

Azure Container Registry (ACR)

Use container images from Azure Container Registry:
# Configure AKS to pull from ACR
az aks update -n myAKSCluster -g myResourceGroup --attach-acr myACRegistry
Then reference ACR images in your flows:
@kubernetes(image='myregistry.azurecr.io/ml-training:v1.0')
@step
def train(self):
    pass

Multi-Cloud Workflows

Metaflow supports hybrid workflows that span multiple clouds:
from metaflow import FlowSpec, step, kubernetes
import os

class MultiCloudFlow(FlowSpec):
    
    @step
    def start(self):
        # Decide storage based on configuration
        if 'METAFLOW_DATASTORE_SYSROOT_AZURE' in os.environ:
            self.storage_type = 'Azure'
        elif 'METAFLOW_DATASTORE_SYSROOT_S3' in os.environ:
            self.storage_type = 'S3'
        else:
            self.storage_type = 'GCS'
        
        self.next(self.process)
    
    @kubernetes(cpu=4, memory=8192)
    @step
    def process(self):
        print(f"Using {self.storage_type} storage")
        self.data = list(range(1000000))
        self.next(self.end)
    
    @step
    def end(self):
        print("Workflow complete")

if __name__ == '__main__':
    MultiCloudFlow()

Performance Optimization

Token Caching

Metaflow caches Azure authentication tokens to minimize token generation overhead:
  • Tokens are generated once and reused until expiration
  • Default token expiration is several hours (configurable by Azure admin)
  • Automatic token refresh when expiration is detected
  • Particularly important for reducing Azure CLI auth latency

Parallel Operations

When performing many Azure operations, use parallel execution:
from metaflow import FlowSpec, step
from concurrent.futures import ThreadPoolExecutor

class ParallelAzureFlow(FlowSpec):
    
    @step
    def start(self):
        self.files = [f'file{i}.dat' for i in range(100)]
        self.next(self.upload)
    
    @step
    def upload(self):
        # Parallel uploads to Azure Blob Storage
        with ThreadPoolExecutor(max_workers=10) as executor:
            results = executor.map(self.upload_file, self.files)
        
        self.uploaded_count = sum(results)
        self.next(self.end)
    
    def upload_file(self, filename):
        # Upload file to Azure
        return 1
    
    @step
    def end(self):
        print(f"Uploaded {self.uploaded_count} files")

if __name__ == '__main__':
    ParallelAzureFlow()

Error Handling

Metaflow translates Azure SDK errors to Metaflow exceptions:
from metaflow import FlowSpec, step
from metaflow.exception import MetaflowException

class ErrorHandlingFlow(FlowSpec):
    
    @step
    def start(self):
        try:
            # Azure operations
            self.data = self.load_from_azure()
        except MetaflowException as e:
            print(f"Azure error: {e}")
            # Fallback behavior
            self.data = self.load_from_backup()
        
        self.next(self.end)
    
    def load_from_azure(self):
        # Load data from Azure Blob Storage
        pass
    
    def load_from_backup(self):
        # Fallback data source
        return []
    
    @step
    def end(self):
        pass

if __name__ == '__main__':
    ErrorHandlingFlow()
Common Azure exceptions:
  • AuthenticationError: Invalid or expired credentials
  • ResourceNotFoundError: Container or blob doesn’t exist
  • ResourceExistsError: Attempting to create existing resource

Monitoring and Logging

Reducing Azure SDK Logging

By default, Metaflow reduces verbose Azure SDK logging:
import logging

# Azure Identity logging is set to ERROR level
# To enable debug logging for troubleshooting:
logging.getLogger('azure.identity').setLevel(logging.DEBUG)
logging.getLogger('azure.storage.blob').setLevel(logging.DEBUG)

Viewing Storage Usage

Monitor Azure Blob Storage usage:
# Using Azure CLI
az storage blob list \
  --account-name mystorageaccount \
  --container-name metaflow-prod \
  --prefix artifacts/ \
  --query "[].{Name:name, Size:properties.contentLength}"

Best Practices

  1. Use Managed Identity in Production: When running on Azure infrastructure (VMs, AKS, Azure Functions), use managed identity for authentication instead of service principals.
  2. Organize by Container: Use separate containers for different environments:
    # Development
    export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-dev/artifacts
    
    # Production
    export METAFLOW_DATASTORE_SYSROOT_AZURE=metaflow-prod/artifacts
    
  3. Enable Soft Delete: Configure soft delete on your storage account to recover accidentally deleted artifacts:
    az storage blob service-properties delete-policy update \
      --account-name mystorageaccount \
      --enable true \
      --days-retained 30
    
  4. Use Azure Key Vault: Store sensitive credentials in Azure Key Vault rather than environment variables.
  5. Lifecycle Management: Implement lifecycle policies to automatically tier or delete old artifacts:
    az storage account management-policy create \
      --account-name mystorageaccount \
      --policy @lifecycle-policy.json
    
  6. Network Security: Use Azure Private Link to secure connections between compute and storage:
    az network private-endpoint create \
      --name metaflow-storage-endpoint \
      --resource-group myResourceGroup \
      --vnet-name myVNet \
      --subnet mySubnet \
      --private-connection-resource-id $STORAGE_ID \
      --group-id blob \
      --connection-name metaflow-storage-connection
    

Troubleshooting

Authentication Issues

Problem: ClientAuthenticationError when accessing storage Solution: Verify your credentials are configured correctly:
# Test Azure CLI authentication
az account show

# Test managed identity (on Azure VM/AKS)
curl -H Metadata:true "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://storage.azure.com/"

# Verify service principal
az login --service-principal -u $AZURE_CLIENT_ID -p $AZURE_CLIENT_SECRET --tenant $AZURE_TENANT_ID

Storage Access Issues

Problem: ResourceNotFoundError when accessing blobs Solution: Verify container exists and permissions are correct:
# Check container exists
az storage container exists \
  --account-name mystorageaccount \
  --name metaflow-prod

# Grant Storage Blob Data Contributor role
az role assignment create \
  --role "Storage Blob Data Contributor" \
  --assignee $PRINCIPAL_ID \
  --scope /subscriptions/$SUB_ID/resourceGroups/$RG/providers/Microsoft.Storage/storageAccounts/$STORAGE_ACCOUNT

Performance Issues

Problem: Slow artifact loading/saving Solution:
  1. Ensure you’re in the same Azure region as your storage account
  2. Use managed identity instead of Azure CLI authentication
  3. Enable parallel uploads for large files
  4. Consider using Azure Premium Storage for high-throughput workloads

Example: Complete Azure Production Flow

from metaflow import FlowSpec, step, Parameter, schedule
from metaflow import kubernetes, retry, timeout, secrets
import os

class AzureProductionFlow(FlowSpec):
    """
    Production ML workflow running entirely on Azure.
    - Data stored in Azure Blob Storage
    - Secrets managed in Azure Key Vault
    - Compute on Azure Kubernetes Service
    """
    
    model_version = Parameter('model_version',
                             default='v1.0',
                             help='Model version to deploy')
    
    @step
    def start(self):
        """Initialize workflow"""
        from datetime import datetime
        self.run_date = datetime.now().strftime('%Y-%m-%d')
        
        # Verify Azure configuration
        assert 'METAFLOW_DATASTORE_SYSROOT_AZURE' in os.environ
        assert 'METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT' in os.environ
        
        print(f"Starting Azure workflow: {self.run_date}")
        self.next(self.load_data)
    
    @kubernetes(cpu=4, memory=16384, 
                image='myregistry.azurecr.io/ml-base:latest')
    @retry(times=3, minutes_between_retries=5)
    @timeout(seconds=3600)
    @secrets(sources=['azure-key-vault'])
    @step
    def load_data(self):
        """Load training data from Azure Blob Storage"""
        from azure.storage.blob import BlobServiceClient
        from metaflow import current
        from metaflow.plugins.azure import create_azure_credential
        
        # Get storage credentials from Key Vault
        endpoint = os.environ['METAFLOW_AZURE_STORAGE_BLOB_SERVICE_ENDPOINT']
        credential = create_azure_credential()
        
        # Load data
        blob_service = BlobServiceClient(
            account_url=endpoint,
            credential=credential
        )
        
        container = blob_service.get_container_client('training-data')
        blob = container.get_blob_client(f'data/{self.run_date}/features.parquet')
        
        self.data_size = blob.get_blob_properties().size
        print(f"Loaded {self.data_size} bytes of training data")
        
        self.next(self.train)
    
    @kubernetes(cpu=16, memory=65536, gpu=2, gpu_vendor='nvidia',
                image='myregistry.azurecr.io/ml-training:latest')
    @retry(times=2)
    @timeout(seconds=7200)
    @step
    def train(self):
        """Train model on AKS with GPU"""
        import torch
        
        print(f"Training {self.model_version} on GPU: {torch.cuda.is_available()}")
        
        # Training code here
        self.model_accuracy = 0.95
        self.model_path = f'models/{self.run_date}/{self.model_version}.pt'
        
        self.next(self.deploy)
    
    @kubernetes(cpu=4, memory=8192)
    @secrets(sources=['azure-key-vault'])
    @step
    def deploy(self):
        """Deploy model to Azure ML endpoint"""
        from metaflow import current
        
        # Get deployment credentials from Key Vault
        api_key = current.secrets.get('ml-endpoint-key')
        
        # Deploy model
        print(f"Deploying {self.model_path} with accuracy {self.model_accuracy}")
        
        self.endpoint_url = f"https://myendpoint.azureml.net/v1/{self.model_version}"
        self.next(self.end)
    
    @step
    def end(self):
        """Workflow complete"""
        print(f"Deployment complete: {self.endpoint_url}")
        print(f"Model version: {self.model_version}")
        print(f"Accuracy: {self.model_accuracy}")

if __name__ == '__main__':
    AzureProductionFlow()
Run locally:
python azure_production_flow.py run
Deploy to Argo Workflows on AKS:
python azure_production_flow.py argo-workflows create

Additional Resources

Build docs developers (and LLMs) love