Skip to main content

Overview

In Metaflow, any attribute you set on self becomes a data artifact that is automatically persisted and accessible in subsequent steps. Understanding how artifacts work is crucial for building efficient flows.

Basic Artifacts

Creating Artifacts

Any attribute assignment creates an artifact:
from metaflow import FlowSpec, step

class ArtifactFlow(FlowSpec):
    @step
    def start(self):
        # All of these become artifacts
        self.count = 42
        self.name = "experiment_1"
        self.data = [1, 2, 3, 4, 5]
        self.config = {"model": "resnet", "lr": 0.001}
        
        self.next(self.process)
    
    @step
    def process(self):
        # Artifacts are automatically available
        print(f"Count: {self.count}")  # 42
        print(f"Name: {self.name}")    # "experiment_1"
        print(f"Data: {self.data}")    # [1, 2, 3, 4, 5]
        
        self.result = sum(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        # All previous artifacts available
        print(f"Result: {self.result}")

Artifact Persistence

Artifacts are stored in the datastore:
# From flowspec.py:591
def __getattr__(self, name: str):
    if self._datastore and name in self._datastore:
        # load the attribute from the datastore...
        x = self._datastore[name]
        # ...and cache it in the object for faster access
        setattr(self, name, x)
        return x
    else:
        raise AttributeError("Flow %s has no attribute '%s'" % (self.name, name))
Artifacts are:
  1. Saved to the datastore at step completion
  2. Loaded from the datastore at the next step start
  3. Cached in memory for fast access

Supported Data Types

Metaflow can serialize most Python objects:
@step
def start(self):
    # Primitives
    self.number = 42
    self.text = "hello"
    self.flag = True
    self.value = 3.14
    
    # Collections
    self.list_data = [1, 2, 3]
    self.tuple_data = (1, 2, 3)
    self.set_data = {1, 2, 3}
    self.dict_data = {"key": "value"}
    
    # Nested structures
    self.nested = {
        "models": [
            {"name": "model_a", "score": 0.95},
            {"name": "model_b", "score": 0.93}
        ]
    }
    
    # NumPy arrays
    import numpy as np
    self.matrix = np.array([[1, 2], [3, 4]])
    
    # Pandas DataFrames
    import pandas as pd
    self.dataframe = pd.DataFrame({
        'A': [1, 2, 3],
        'B': [4, 5, 6]
    })
    
    # Custom objects (if picklable)
    self.model = MyModel()
    
    self.next(self.end)

Ephemeral Attributes

Some attributes are not persisted:
# From flowspec.py:279
_EPHEMERAL = {
    '_EPHEMERAL',
    '_NON_PARAMETERS',
    '_datastore',
    '_cached_input',
    '_graph',
    '_flow_state',
    '_steps',
    'index',
    'input',
}
These are:
  • Internal Metaflow attributes (start with _)
  • Properties like index and input (computed per step)
  • Flow metadata structures
@step
def start(self):
    # This is NOT saved as an artifact
    self._temp = "temporary"
    
    # This IS saved
    self.result = "permanent"
    
    self.next(self.end)

@step
def end(self):
    print(self.result)  # Works: "permanent"
    # print(self._temp)  # Would fail: AttributeError

Artifact Size Considerations

Small Artifacts

Small artifacts (< 1MB) are handled efficiently:
@step
def start(self):
    self.config = {"batch_size": 32, "learning_rate": 0.001}
    self.metrics = [0.1, 0.2, 0.3]
    self.next(self.end)

Large Artifacts

For large data (> 100MB), consider optimization strategies:
@step
def start(self):
    # Load large dataset
    raw_data = load_large_dataset()  # 5GB
    
    # Strategy 1: Extract only what you need
    self.summary_stats = {
        'mean': raw_data.mean(),
        'std': raw_data.std(),
        'count': len(raw_data)
    }
    
    # Strategy 2: Sample the data
    self.sample = raw_data.sample(n=10000)
    
    # Strategy 3: Store reference instead
    self.data_path = save_to_s3(raw_data)
    
    self.next(self.process)

@step
def process(self):
    # Load from external storage when needed
    data = load_from_s3(self.data_path)
    self.next(self.end)

Using IncludeFile for Data Files

from metaflow import IncludeFile

class DataFlow(FlowSpec):
    # File content included at flow start
    data_file = IncludeFile('data',
                           help='Input data file',
                           default='data.csv')
    
    @step
    def start(self):
        # Access file content as string
        import csv
        lines = self.data_file.splitlines()
        self.data = list(csv.DictReader(lines))
        self.next(self.end)

Artifacts in Branches

Artifact Propagation

Artifacts from before a split are available in all branches:
@step
def start(self):
    self.shared_data = [1, 2, 3, 4, 5]
    self.next(self.branch_a, self.branch_b)

@step
def branch_a(self):
    # shared_data is available
    self.result_a = sum(self.shared_data)
    self.next(self.join)

@step
def branch_b(self):
    # shared_data is available here too
    self.result_b = max(self.shared_data)
    self.next(self.join)

Merge Artifacts

Merge artifacts from parallel branches:
@step
def join(self, inputs):
    # Method 1: Manual selection
    self.result_a = inputs.branch_a.result_a
    self.result_b = inputs.branch_b.result_b
    
    # Method 2: Use merge_artifacts helper
    self.merge_artifacts(inputs)
    # Now self.result_a and self.result_b are both available
    
    self.next(self.end)
From the implementation:
# From flowspec.py:730
def merge_artifacts(
    self,
    inputs: Inputs,
    exclude: Optional[List[str]] = None,
    include: Optional[List[str]] = None,
) -> None:
    """Helper function for merging artifacts in a join step."""

Conflict Resolution

Handle conflicting artifacts:
@step
def branch_a(self):
    self.model_score = 0.95
    self.next(self.join)

@step
def branch_b(self):
    self.model_score = 0.93  # Conflict!
    self.next(self.join)

@step
def join(self, inputs):
    # Resolve conflict manually BEFORE merge_artifacts
    self.model_score = max(inp.model_score for inp in inputs)
    
    # Now merge_artifacts won't fail
    self.merge_artifacts(inputs)
    self.next(self.end)

Artifacts in Foreach

Artifacts Created in Foreach

Each foreach task creates its own artifacts:
@step
def start(self):
    self.items = ['A', 'B', 'C']
    self.next(self.process, foreach='items')

@step
def process(self):
    # Each task has access to self.input
    self.processed = f"Processed {self.input}"
    self.score = compute_score(self.input)
    self.next(self.join)

@step
def join(self, inputs):
    # Collect artifacts from all foreach tasks
    self.all_processed = [inp.processed for inp in inputs]
    self.all_scores = [inp.score for inp in inputs]
    
    # Aggregate
    self.avg_score = sum(self.all_scores) / len(self.all_scores)
    self.next(self.end)

Accessing Parent Artifacts

Artifacts from before the foreach are available:
@step
def start(self):
    self.config = {"threshold": 0.5}
    self.items = [1, 2, 3]
    self.next(self.process, foreach='items')

@step
def process(self):
    # Parent artifact is available
    if self.input > self.config['threshold']:
        self.status = 'high'
    else:
        self.status = 'low'
    self.next(self.join)

Internal Artifacts

Metaflow creates some internal artifacts:
# From flowspec.py:47
INTERNAL_ARTIFACTS_SET = set(
    [
        '_foreach_values',
        '_unbounded_foreach',
        '_control_mapper_tasks',
        '_control_task_is_mapper_zero',
        '_parallel_ubf_iter',
    ]
)
These manage foreach execution and should not be modified.

Artifact Best Practices

1. Be Selective

Only save what you need:
# Bad: Saving everything
@step
def start(self):
    raw_data = load_data()  # 5GB
    processed = process(raw_data)  # 3GB
    temp_results = analyze(processed)  # 2GB
    
    self.raw_data = raw_data
    self.processed = processed
    self.temp_results = temp_results
    self.final = summarize(temp_results)
    # Total: 10GB saved!
    self.next(self.end)

# Good: Save only what's needed
@step
def start(self):
    raw_data = load_data()
    processed = process(raw_data)
    temp_results = analyze(processed)
    
    # Only save final result
    self.final = summarize(temp_results)
    # Total: <1MB saved
    self.next(self.end)

2. Use Clear Names

# Good: Descriptive names
self.training_accuracy = 0.95
self.validation_accuracy = 0.93
self.test_predictions = predictions

# Bad: Unclear names
self.acc1 = 0.95
self.acc2 = 0.93
self.preds = predictions

3. Document Complex Artifacts

@step
def start(self):
    # Model performance metrics
    # Structure: {metric_name: {fold: score}}
    self.cv_scores = {
        'accuracy': {0: 0.95, 1: 0.93, 2: 0.94},
        'f1': {0: 0.92, 1: 0.91, 2: 0.93}
    }
    self.next(self.end)

4. Avoid Serialization Issues

Don’t store objects that can’t be pickled:
# Bad: Database connections can't be pickled
@step
def start(self):
    self.db_connection = connect_to_db()  # Will fail!
    self.next(self.end)

# Good: Store connection string, reconnect when needed
@step
def start(self):
    self.db_url = "postgresql://localhost/mydb"
    self.next(self.process)

@step
def process(self):
    # Reconnect in each step
    db = connect_to_db(self.db_url)
    # Use connection...
    self.next(self.end)

5. Don’t Serialize the Flow

Never assign self to an artifact:
# From flowspec.py:1170
def __getstate__(self):
    raise MetaflowException(
        "Flows can't be serialized. Maybe you tried "
        "to assign *self* or one of the *inputs* "
        "to an attribute?"
    )
# Bad: This will fail
@step
def start(self):
    self.flow_copy = self  # Error!
    self.next(self.end)

# Good: Extract specific attributes
@step
def start(self):
    self.flow_name = self.name
    self.step_name = self._current_step
    self.next(self.end)

Artifact Metadata

Metaflow stores metadata about artifacts:
# From flowspec.py:514
graph_info = {
    'file': os.path.basename(os.path.abspath(sys.argv[0])),
    'parameters': parameters_info,
    'constants': constants_info,
    'steps': steps_info,
    'graph_structure': graph_structure,
    'doc': graph.doc,
    'decorators': [...],
    'extensions': extension_info(),
}
self._graph_info = graph_info
This metadata is stored in the _graph_info artifact and includes:
  • Flow file name
  • Parameter definitions
  • Constants
  • Step structure
  • Decorator information

External Storage Patterns

For very large data, use external storage:

S3 Pattern

import boto3
from metaflow import S3

class S3Flow(FlowSpec):
    @step
    def start(self):
        # Process large data
        large_data = create_large_dataset()
        
        # Save to S3
        with S3(run=self) as s3:
            s3url = s3.put('large_data.pkl', large_data)
            self.data_url = s3url
        
        self.next(self.process)
    
    @step
    def process(self):
        # Load from S3
        with S3(run=self) as s3:
            large_data = s3.get(self.data_url).blob
        
        # Process...
        self.result = analyze(large_data)
        self.next(self.end)

Custom Storage Pattern

class CustomStorageFlow(FlowSpec):
    @step
    def start(self):
        large_data = load_data()
        
        # Save to custom storage
        data_id = save_to_storage(large_data)
        self.data_id = data_id
        
        self.next(self.process)
    
    @step
    def process(self):
        # Load from storage
        data = load_from_storage(self.data_id)
        self.result = process(data)
        self.next(self.end)

Inspecting Artifacts

You can inspect artifacts from completed runs:
from metaflow import Flow, Run

# Get the latest run
run = Flow('MyFlow').latest_run

# List all artifacts
for step in run:
    print(f"Step: {step.id}")
    for artifact_name in step.data:
        print(f"  {artifact_name}")

# Access specific artifact
result = run.data.result
print(f"Result: {result}")

Performance Tips

1. Minimize Artifact Size

# Instead of storing entire DataFrame
import pandas as pd
df = pd.read_csv('large_file.csv')  # 1GB
self.df = df  # Saves 1GB

# Store only what's needed
self.summary = {
    'count': len(df),
    'mean': df['value'].mean(),
    'columns': df.columns.tolist()
}  # Saves a few KB

2. Use Efficient Formats

# Pandas DataFrames are stored efficiently
import pandas as pd
self.data = pd.DataFrame(...)  # Uses Parquet internally

# NumPy arrays are efficient
import numpy as np
self.matrix = np.array(...)  # Binary format

# Nested dicts/lists can be large
self.big_dict = {...}  # Pickled, can be large

3. Lazy Loading

@step
def start(self):
    # Store path, not data
    self.model_path = 's3://bucket/model.pkl'
    self.next(self.predict)

@step
def predict(self):
    # Load only when needed
    model = load_model(self.model_path)
    predictions = model.predict(self.data)
    self.next(self.end)

Next Steps

FlowSpec

Deep dive into the FlowSpec base class

Branching

Manage artifacts across parallel branches

Foreach

Handle artifacts in dynamic foreach loops

Parameters

Use Parameters and Configs as special artifacts

Build docs developers (and LLMs) love