Skip to main content
The Metaflow datastore is the low-level storage system that persists artifacts, metadata, and logs for your flows. It provides content-addressed storage for efficient artifact management and hierarchical storage for metadata.

Overview

The datastore consists of several layers:
  • FlowDataStore: Top-level store for a flow, manages TaskDataStores and content-addressed storage
  • TaskDataStore: Stores artifacts, metadata, and logs for a specific task
  • ContentAddressedStore: Deduplicates and stores artifacts based on content hash
  • DataStoreStorage: Backend implementation (S3, Azure Blob, Local, etc.)
Most users interact with the datastore indirectly through artifacts (self.artifact = value) rather than directly. This page documents the underlying concepts and direct APIs.

Storage Architecture

Content-Addressed Storage

Artifacts in Metaflow are stored using content addressing, which means:
  • Each artifact is identified by a hash of its contents
  • Identical artifacts are stored only once, regardless of how many tasks produce them
  • Storage is efficient and deduplication is automatic
For example, if 100 tasks all produce the same dataset, only one copy is stored.

Hierarchical Metadata Storage

Metadata is stored hierarchically by pathspec:
flow_name/
  run_id/
    step_name/
      task_id/
        metadata.json
        logs.txt
        DONE.lock
This structure allows efficient querying of task metadata and status.

FlowDataStore

The FlowDataStore is the top-level datastore for a flow.

Initialization

from metaflow.datastore import FlowDataStore

flow_datastore = FlowDataStore(
    flow_name='MyFlow',
    environment=env,
    metadata=metadata_provider,
    storage_impl=storage_class
)

Properties

datastore_root (str): Root path for all storage TYPE (str): Storage backend type (e.g., “s3”, “azure”, “local”) ca_store (ContentAddressedStore): Content-addressed storage instance

Methods

get_task_datastores

Retrieve TaskDataStore instances for specific tasks:
task_stores = flow_datastore.get_task_datastores(
    run_id='12345',
    steps=['train', 'evaluate'],
    mode='r'
)
Parameters:
  • run_id: Run ID to filter tasks
  • steps: List of step names to include
  • pathspecs: Alternative to run_id/steps - full task pathspecs
  • allow_not_done: Include tasks without DONE marker
  • attempt: Specific attempt number to retrieve
  • mode: “r” for read, “w” for write
Returns: List of TaskDataStore instances

TaskDataStore

The TaskDataStore handles storage for a single task’s artifacts, metadata, and logs.

Initialization

Usually obtained through FlowDataStore.get_task_datastores() or FlowDataStore.get_datastore_for_task().

Modes

  • Read mode ('r'): Load existing artifacts and metadata
  • Write mode ('w'): Save new artifacts and metadata

Artifact Operations

save_artifacts

Save task artifacts (called automatically by Metaflow runtime):
task_store.save_artifacts(
    artifacts_to_save,
    force_pickle=False,
    len_hint=0
)

load_artifacts

Load task artifacts:
artifacts = task_store.load_artifacts(names=['model', 'accuracy'])
for name, value in artifacts:
    print(f"{name}: {value}")

Metadata Operations

save_metadata

Save task metadata:
task_store.save_metadata({
    'duration': 120.5,
    'status': 'success'
})

load_metadata

Load task metadata:
metadata = task_store.load_metadata(['duration', 'status'])

Log Operations

save_log

Save task logs:
task_store.save_log('stdout', 'Log message\n', attempt=0)

load_log

Load task logs:
log_content = task_store.load_log('stdout', attempt=0)
print(log_content)

Lifecycle

done

Mark the task as complete:
task_store.done()
This creates a DONE marker that signals the task completed successfully. Tasks without this marker are considered incomplete.

is_done

Check if task is marked as done:
if task_store.is_done:
    print("Task completed")

Storage Backends

Metaflow supports multiple storage backends:

S3 Storage

Default for AWS deployments:
from metaflow.plugins.datastores.s3_storage import S3DataStoreStorage

storage = S3DataStoreStorage(ds_root='s3://my-bucket/metaflow')

Azure Blob Storage

For Azure deployments:
from metaflow.plugins.datastores.azure_storage import AzureDataStoreStorage

storage = AzureDataStoreStorage(ds_root='azure://container/metaflow')

Local Storage

For development and testing:
from metaflow.plugins.datastores.local_storage import LocalDataStoreStorage

storage = LocalDataStoreStorage(ds_root='/tmp/metaflow')

Advanced Usage

Content-Addressed Store

Direct access to content-addressed storage:
from metaflow.datastore import ContentAddressedStore

ca_store = flow_datastore.ca_store

# Save blobs with automatic deduplication
keys = ca_store.save_blobs(
    [
        ('data1', b'content1'),
        ('data2', b'content2')
    ],
    len_hint=2
)

# Load blobs by keys
blobs = ca_store.load_blobs(keys)
for key, content in blobs:
    print(f"{key}: {content}")

Custom Metadata

Store custom metadata for tasks:
# In a flow step
task_store = self._datastore
task_store.save_metadata({
    'custom_metric': 0.95,
    'model_version': '2.0',
    'timestamp': time.time()
})
Later, retrieve this metadata:
from metaflow import Flow, Task

run = Flow('MyFlow').latest_run
for task in run['train']:
    task_store = task._get_datastore()
    metadata = task_store.load_metadata(['custom_metric', 'model_version'])
    print(metadata)

Configuration

Datastore behavior is controlled by environment variables and configuration:
  • METAFLOW_DATASTORE_SYSROOT_S3: S3 root for artifact storage
  • METAFLOW_DATASTORE_SYSROOT_AZURE: Azure root for artifact storage
  • METAFLOW_DATASTORE_SYSROOT_LOCAL: Local root for artifact storage
  • METAFLOW_DEFAULT_DATASTORE: Default storage backend type

Artifact Serialization

Artifacts are serialized using pickle (protocol 2 by default, protocol 4 for Python 3.6+):
import pickle

# Artifacts are pickled before storage
data = {'key': 'value'}
serialized = pickle.dumps(data, protocol=2)

# And unpickled when loaded
restored = pickle.loads(serialized)
For artifacts that can’t be pickled, Metaflow will raise an UnpicklableArtifactException.

Best Practices

  1. Let Metaflow manage the datastore: Use self.artifact = value instead of direct datastore calls
  2. Keep artifacts reasonably sized: Very large artifacts (>100GB) can be slow to serialize
  3. Use external storage for huge datasets: For multi-terabyte datasets, use S3 client directly
  4. Leverage deduplication: Identical artifacts across tasks are stored once
  5. Use metadata for lightweight data: Store task metrics and status in metadata, not artifacts

Error Handling

DataException

Raised for general datastore errors:
from metaflow.datastore.exceptions import DataException

try:
    artifacts = task_store.load_artifacts(['missing'])
except DataException as e:
    print(f"Failed to load artifacts: {e}")

UnpicklableArtifactException

Raised when an artifact can’t be serialized:
from metaflow.datastore.exceptions import UnpicklableArtifactException

class MyFlow(FlowSpec):
    @step
    def start(self):
        # This will raise UnpicklableArtifactException
        self.database_connection = create_db_connection()
        self.next(self.end)
Solution: Don’t store unpicklable objects as artifacts.

Performance Considerations

Parallel Operations

The datastore supports parallel operations for efficiency:
  • Multiple tasks can read/write simultaneously
  • Content-addressed storage enables efficient deduplication
  • S3 backend uses multipart uploads for large artifacts

Caching

Metadata can be cached for performance:
flow_datastore.set_metadata_cache(cache_instance)

Compression

Artifacts can be compressed (gzip) for storage:
  • Reduces storage costs
  • May increase CPU usage during serialization
  • Configured per storage backend
  • S3 - High-level S3 client for direct cloud storage access
  • IncludeFile - Include local files as flow parameters
  • Artifacts - Working with artifacts in flows

Build docs developers (and LLMs) love