Data artifacts are the outputs produced by steps in your flow. Metaflow automatically versions and persists all artifacts, making them accessible for analysis, debugging, and reproduction.
Creating Artifacts
Any attribute you set on self in a step becomes a data artifact:
from metaflow import FlowSpec, step
class ArtifactExample(FlowSpec):
@step
def start(self):
# All of these become artifacts
self.raw_data = [1, 2, 3, 4, 5]
self.model = {"weights": [0.5, 0.3, 0.2]}
self.accuracy = 0.95
self.metadata = {"version": "1.0", "date": "2024-01-01"}
self.next(self.end)
@step
def end(self):
# Access artifacts from previous step
print(f"Accuracy: {self.accuracy}")
print(f"Data: {self.raw_data}")
Artifact Persistence
Metaflow automatically:
- Serializes artifacts using Python’s pickle protocol
- Stores them in the configured datastore (local or cloud)
- Versions them with unique identifiers
- Makes them accessible across steps and runs
Artifacts must be serializable. Most Python objects work, but network connections, file handles, and some third-party objects may not serialize properly.
Accessing Artifacts
Artifacts flow automatically between steps:
@step
def start(self):
self.data = load_data()
self.next(self.process)
@step
def process(self):
# self.data is automatically available
self.result = transform(self.data)
self.next(self.end)
Private Artifacts
Attributes starting with _ are private and not persisted:
@step
def start(self):
self._temp_data = "not saved" # Private, ephemeral
self.important_data = "saved" # Saved as artifact
self.next(self.end)
Use private attributes for temporary data that doesn’t need to be saved, like intermediate calculations or cached values.
Internal Artifacts
Metaflow creates several internal artifacts:
_foreach_values: Values being iterated in a foreach
_graph_info: Flow graph structure and metadata
_foreach_stack: Nested foreach context
_success: Step completion status
These are accessible but typically used internally by Metaflow.
Artifact Merging
In join steps, use merge_artifacts() to combine artifacts from parallel branches:
@step
def start(self):
self.shared_config = {"key": "value"}
self.next(self.branch_a, self.branch_b)
@step
def branch_a(self):
self.result_a = "from A"
self.next(self.join)
@step
def branch_b(self):
self.result_b = "from B"
self.next(self.join)
@step
def join(self, inputs):
# Automatically merges non-conflicting artifacts
self.merge_artifacts(inputs)
# Now you have:
# - self.shared_config (same in both branches)
# - self.result_a (from branch_a)
# - self.result_b (from branch_b)
self.next(self.end)
Merge Behavior
merge_artifacts() intelligently handles conflicts:
- Same value: Artifact is merged automatically
- Different values: Raises an exception
- Only in one branch: Merged automatically
- Already set in join: Not merged (manual value takes precedence)
Selective Merging
@step
def join(self, inputs):
# Only merge specific artifacts
self.merge_artifacts(inputs, include=['result_a', 'result_b'])
# Or exclude specific artifacts
self.merge_artifacts(inputs, exclude=['temporary_data'])
self.next(self.end)
Foreach Artifacts
In foreach loops, access the current item:
@step
def start(self):
self.parameters = [
{"lr": 0.01, "batch_size": 32},
{"lr": 0.001, "batch_size": 64},
{"lr": 0.0001, "batch_size": 128}
]
self.next(self.train, foreach='parameters')
@step
def train(self):
# Access current parameter set
config = self.input
print(f"Training with lr={config['lr']}")
# Create artifacts specific to this iteration
self.model = train_model(config)
self.accuracy = evaluate(self.model)
self.next(self.join)
@step
def join(self, inputs):
# Access all models and accuracies
self.all_models = [inp.model for inp in inputs]
self.best_accuracy = max(inp.accuracy for inp in inputs)
self.next(self.end)
Large Artifacts
For large objects, consider:
Compression
import gzip
import pickle
@step
def start(self):
large_data = load_huge_dataset()
# Compress before storing
self.compressed_data = gzip.compress(pickle.dumps(large_data))
self.next(self.end)
@step
def end(self):
# Decompress when needed
large_data = pickle.loads(gzip.decompress(self.compressed_data))
External Storage
from metaflow import FlowSpec, step, S3
@step
def start(self):
large_data = load_huge_dataset()
# Store in S3, save only reference
with S3(run=self) as s3:
s3.put('large_data', large_data)
self.data_key = 'large_data'
self.next(self.end)
@step
def end(self):
# Load from S3 when needed
with S3(run=self) as s3:
large_data = s3.get(self.data_key).blob
For data larger than a few gigabytes, use external storage (S3, GCS, etc.) and store only references as artifacts.
Inspecting Artifacts
Use the Client API to inspect artifacts from past runs:
from metaflow import Flow, Run
# Get a specific run
run = Run('MyFlow/123')
# Access a step
step = run['process']
# Access the task
task = step.task
# Get artifacts
print(task.data.accuracy) # Access specific artifact
print(task.data.model) # Access another artifact
# Or use DataArtifact directly
artifact = task['accuracy']
print(artifact.data) # The actual value
print(artifact.sha) # Unique identifier
print(artifact.finished_at) # When it was created
See Client API for more details.
Artifact Lifecycle
Set attributes on self during step execution.
Metaflow pickles the object using Python’s pickle protocol.
The serialized artifact is stored in the datastore with a unique SHA.
Artifacts are automatically loaded in subsequent steps.
Artifacts remain accessible via the Client API indefinitely.
Best Practices
Store intermediate results: Save important intermediate computations as artifacts. This makes debugging easier and allows resuming from any step.
Use descriptive names: Artifact names appear in logs and the UI. Use clear, descriptive names like trained_model instead of m.
Consider size: Keep artifacts reasonably sized (< 1GB each). For larger data, use external storage.
Don’t store credentials: Never store passwords, API keys, or other secrets as artifacts. Use environment variables or secret management systems instead.
Common Patterns
Checkpointing
@step
def train(self):
for epoch in range(100):
train_epoch(self.model)
# Checkpoint every 10 epochs
if epoch % 10 == 0:
self.checkpoint = self.model.state_dict()
self.next(self.end)
Artifact Collections
@step
def analyze(self):
self.results = {
"metrics": {"accuracy": 0.95, "loss": 0.05},
"plots": generate_plots(),
"tables": create_summary_tables()
}
self.next(self.end)