LeRobot’s processor pipeline system transforms raw robot data through multiple steps. This guide shows you how to debug and inspect data at each stage of the pipeline.
Understanding the Pipeline
A processor pipeline is a sequence of ProcessorStep objects that transform data:
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline(
steps=[
DeviceProcessorStep(device="cuda"),
NormalizerProcessorStep(stats=dataset_stats),
AddBatchDimensionProcessorStep(),
],
name="my_pipeline"
)
# Process data through full pipeline
output = pipeline(input_data)
See lerobot/processor/pipeline.py:254 for the full DataProcessorPipeline implementation.
Step-Through Debugging
Use step_through() to inspect data after each processing step:
from lerobot.processor import create_transition
# Create sample data
transition = create_transition(
observation={"camera": image, "state": joint_positions},
action=action_tensor,
)
# Step through pipeline
for i, intermediate_transition in enumerate(pipeline.step_through(transition)):
print(f"\nAfter step {i}:")
if i == 0:
print(" Initial state")
else:
step = pipeline.steps[i - 1]
print(f" Step: {step.__class__.__name__}")
# Inspect observation
obs = intermediate_transition.get('observation')
if obs:
for key, value in obs.items():
if isinstance(value, torch.Tensor):
print(f" {key}: shape={value.shape}, dtype={value.dtype}, device={value.device}")
# Inspect action
action = intermediate_transition.get('action')
if isinstance(action, torch.Tensor):
print(f" action: shape={action.shape}, dtype={action.dtype}, device={action.device}")
Example output:
After step 0:
Initial state
camera: shape=(480, 640, 3), dtype=torch.uint8, device=cpu
state: shape=(7,), dtype=torch.float32, device=cpu
action: shape=(7,), dtype=torch.float32, device=cpu
After step 1:
Step: DeviceProcessorStep
camera: shape=(480, 640, 3), dtype=torch.uint8, device=cuda:0
state: shape=(7,), dtype=torch.float32, device=cuda:0
action: shape=(7,), dtype=torch.float32, device=cuda:0
After step 2:
Step: NormalizerProcessorStep
camera: shape=(480, 640, 3), dtype=torch.float32, device=cuda:0 # Normalized to [0,1]
state: shape=(7,), dtype=torch.float32, device=cuda:0 # Normalized by stats
action: shape=(7,), dtype=torch.float32, device=cuda:0 # Normalized by stats
Custom Hooks
Add hooks to inspect or modify data at specific pipeline stages:
def before_step_hook(step_idx: int, transition: dict):
"""Called before each step"""
step_name = pipeline.steps[step_idx].__class__.__name__
print(f"\n→ Before {step_name}")
# Check for NaN values
for key, value in transition.items():
if isinstance(value, dict):
for k, v in value.items():
if isinstance(v, torch.Tensor) and torch.isnan(v).any():
print(f" ⚠️ NaN detected in {key}.{k}")
def after_step_hook(step_idx: int, transition: dict):
"""Called after each step"""
step_name = pipeline.steps[step_idx].__class__.__name__
print(f"← After {step_name}")
# Log statistics
action = transition.get('action')
if isinstance(action, torch.Tensor):
print(f" Action stats: mean={action.mean():.4f}, std={action.std():.4f}")
# Create pipeline with hooks
pipeline = DataProcessorPipeline(
steps=[...],
before_step_hooks=[before_step_hook],
after_step_hooks=[after_step_hook]
)
Track how data shapes and types change through the pipeline:
from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType
# Define input features
input_features = {
PipelineFeatureType.OBSERVATION: {
"observation.images.camera": PolicyFeature(type=FeatureType.IMAGE, shape=(3, 224, 224)),
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,)),
},
PipelineFeatureType.ACTION: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
},
}
# Transform through each step
current_features = input_features
for i, step in enumerate(pipeline.steps):
print(f"\nStep {i}: {step.__class__.__name__}")
# Get transformed features
current_features = step.transform_features(current_features)
# Print feature descriptions
for feature_type, features in current_features.items():
print(f" {feature_type}:")
for name, feature in features.items():
print(f" {name}: shape={feature.shape}, type={feature.type}")
Example output:
Step 0: TokenizerProcessorStep
OBSERVATION:
observation.language_tokens: shape=(77,), type=TEXT
observation.language_attention_mask: shape=(77,), type=TEXT
observation.images.camera: shape=(3, 224, 224), type=IMAGE
observation.state: shape=(7,), type=STATE
ACTION:
action: shape=(7,), type=ACTION
Step 1: NormalizerProcessorStep
OBSERVATION:
observation.language_tokens: shape=(77,), type=TEXT
observation.language_attention_mask: shape=(77,), type=TEXT
observation.images.camera: shape=(3, 224, 224), type=IMAGE
observation.state: shape=(7,), type=STATE # Normalized
ACTION:
action: shape=(7,), type=ACTION # Normalized
Debugging Normalization
Inspect normalization statistics:
from lerobot.processor import NormalizerProcessorStep
normalizer = None
for step in pipeline.steps:
if isinstance(step, NormalizerProcessorStep):
normalizer = step
break
if normalizer:
print("Normalization stats:")
for key, stats in normalizer.stats.items():
print(f"\n{key}:")
if 'mean' in stats:
print(f" mean: {stats['mean']}")
print(f" std: {stats['std']}")
if 'min' in stats:
print(f" min: {stats['min']}")
print(f" max: {stats['max']}")
# Check tensor stats (on device)
print("\nTensor stats:")
for key, tensor_stats in normalizer._tensor_stats.items():
print(f"\n{key}:")
for stat_name, tensor in tensor_stats.items():
print(f" {stat_name}: device={tensor.device}, dtype={tensor.dtype}")
Debugging Device Placement
Verify tensors are on correct devices:
def check_device_placement(transition: dict, expected_device: str = "cuda"):
"""Recursively check all tensors are on expected device"""
issues = []
def check_recursive(obj, path=""):
if isinstance(obj, torch.Tensor):
if obj.device.type != expected_device:
issues.append(f"{path}: expected {expected_device}, got {obj.device}")
elif isinstance(obj, dict):
for key, value in obj.items():
check_recursive(value, f"{path}.{key}" if path else key)
check_recursive(transition)
if issues:
print("❌ Device placement issues:")
for issue in issues:
print(f" {issue}")
else:
print(f"✅ All tensors on {expected_device}")
# Use in pipeline
for i, transition in enumerate(pipeline.step_through(data)):
if i > 0 and isinstance(pipeline.steps[i-1], DeviceProcessorStep):
check_device_placement(transition, pipeline.steps[i-1].device)
Saving Pipeline State
Save pipeline configuration and state for debugging:
from pathlib import Path
# Save pipeline
pipeline.save_pretrained("debug_pipeline")
# Check saved files
save_dir = Path("debug_pipeline")
print("Saved files:")
for file in save_dir.iterdir():
print(f" {file.name}")
if file.suffix == ".json":
import json
with open(file) as f:
config = json.load(f)
print(json.dumps(config, indent=2))
Common Issues and Solutions
Issue: NaN Values After Normalization
Cause: Division by zero when std is 0
Solution: Check normalization stats:
for key, stats in normalizer.stats.items():
if 'std' in stats:
std = torch.tensor(stats['std'])
if (std == 0).any():
print(f"⚠️ Zero std in {key}: {std}")
print(" This will cause NaN values!")
Issue: Device Mismatch Errors
Cause: Tensors on different devices
Solution: Add device processor early in pipeline:
pipeline = DataProcessorPipeline(
steps=[
DeviceProcessorStep(device="cuda"), # First step!
# ... other steps
]
)
Issue: Shape Mismatches
Cause: Step expecting different input shape
Solution: Use transform_features() to verify:
current_features = input_features
for step in pipeline.steps:
print(f"Before {step.__class__.__name__}:")
print(f" Features: {current_features}")
try:
current_features = step.transform_features(current_features)
except Exception as e:
print(f"❌ Error in {step.__class__.__name__}: {e}")
break
Debugging Custom Processors
Add debug output to custom processor steps:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from dataclasses import dataclass
@ProcessorStepRegistry.register("debug_processor")
@dataclass
class DebugProcessorStep(ProcessorStep):
verbose: bool = True
def __call__(self, transition):
if self.verbose:
print(f"\n=== {self.__class__.__name__} ===")
print(f"Input keys: {transition.keys()}")
for key, value in transition.items():
if isinstance(value, torch.Tensor):
print(f"{key}: {value.shape}, {value.dtype}, {value.device}")
elif isinstance(value, dict):
print(f"{key}: dict with {len(value)} items")
for k, v in value.items():
if isinstance(v, torch.Tensor):
print(f" {k}: {v.shape}, {v.dtype}, {v.device}")
return transition
def transform_features(self, features):
return features # Pass through
# Use in pipeline
pipeline = DataProcessorPipeline(
steps=[
DebugProcessorStep(verbose=True),
DeviceProcessorStep(device="cuda"),
DebugProcessorStep(verbose=True),
NormalizerProcessorStep(stats=stats),
DebugProcessorStep(verbose=True),
]
)
Measure time spent in each step:
import time
timings = []
def timing_hook(step_idx: int, transition: dict):
if not hasattr(timing_hook, 'start_time'):
timing_hook.start_time = time.perf_counter()
else:
elapsed = time.perf_counter() - timing_hook.start_time
step_name = pipeline.steps[step_idx].__class__.__name__
timings.append((step_name, elapsed))
timing_hook.start_time = time.perf_counter()
pipeline = DataProcessorPipeline(
steps=[...],
after_step_hooks=[timing_hook]
)
# Run pipeline
output = pipeline(data)
# Print timings
print("\nPipeline timings:")
total = sum(t for _, t in timings)
for step_name, elapsed in timings:
pct = 100 * elapsed / total
print(f"{step_name:30s}: {elapsed*1000:6.2f}ms ({pct:5.1f}%)")
print(f"{'Total':30s}: {total*1000:6.2f}ms")
API Reference
DataProcessorPipeline
See lerobot/processor/pipeline.py:254
step_through
(data) -> Iterable[EnvTransition]
Yield intermediate states after each processing step
Process data through full pipeline
Save pipeline configuration and state
ProcessorStep
See lerobot/processor/pipeline.py:143
Transform feature descriptions without processing actual data
Return processor state (e.g., normalization stats)
Load processor state from dictionary