Skip to main content
LeRobot’s processor pipeline system transforms raw robot data through multiple steps. This guide shows you how to debug and inspect data at each stage of the pipeline.

Understanding the Pipeline

A processor pipeline is a sequence of ProcessorStep objects that transform data:
from lerobot.processor import DataProcessorPipeline

pipeline = DataProcessorPipeline(
    steps=[
        DeviceProcessorStep(device="cuda"),
        NormalizerProcessorStep(stats=dataset_stats),
        AddBatchDimensionProcessorStep(),
    ],
    name="my_pipeline"
)

# Process data through full pipeline
output = pipeline(input_data)
See lerobot/processor/pipeline.py:254 for the full DataProcessorPipeline implementation.

Step-Through Debugging

Use step_through() to inspect data after each processing step:
from lerobot.processor import create_transition

# Create sample data
transition = create_transition(
    observation={"camera": image, "state": joint_positions},
    action=action_tensor,
)

# Step through pipeline
for i, intermediate_transition in enumerate(pipeline.step_through(transition)):
    print(f"\nAfter step {i}:")
    
    if i == 0:
        print("  Initial state")
    else:
        step = pipeline.steps[i - 1]
        print(f"  Step: {step.__class__.__name__}")
    
    # Inspect observation
    obs = intermediate_transition.get('observation')
    if obs:
        for key, value in obs.items():
            if isinstance(value, torch.Tensor):
                print(f"  {key}: shape={value.shape}, dtype={value.dtype}, device={value.device}")
    
    # Inspect action
    action = intermediate_transition.get('action')
    if isinstance(action, torch.Tensor):
        print(f"  action: shape={action.shape}, dtype={action.dtype}, device={action.device}")
Example output:
After step 0:
  Initial state
  camera: shape=(480, 640, 3), dtype=torch.uint8, device=cpu
  state: shape=(7,), dtype=torch.float32, device=cpu
  action: shape=(7,), dtype=torch.float32, device=cpu

After step 1:
  Step: DeviceProcessorStep
  camera: shape=(480, 640, 3), dtype=torch.uint8, device=cuda:0
  state: shape=(7,), dtype=torch.float32, device=cuda:0
  action: shape=(7,), dtype=torch.float32, device=cuda:0

After step 2:
  Step: NormalizerProcessorStep
  camera: shape=(480, 640, 3), dtype=torch.float32, device=cuda:0  # Normalized to [0,1]
  state: shape=(7,), dtype=torch.float32, device=cuda:0  # Normalized by stats
  action: shape=(7,), dtype=torch.float32, device=cuda:0  # Normalized by stats

Custom Hooks

Add hooks to inspect or modify data at specific pipeline stages:
def before_step_hook(step_idx: int, transition: dict):
    """Called before each step"""
    step_name = pipeline.steps[step_idx].__class__.__name__
    print(f"\n→ Before {step_name}")
    
    # Check for NaN values
    for key, value in transition.items():
        if isinstance(value, dict):
            for k, v in value.items():
                if isinstance(v, torch.Tensor) and torch.isnan(v).any():
                    print(f"  ⚠️ NaN detected in {key}.{k}")

def after_step_hook(step_idx: int, transition: dict):
    """Called after each step"""
    step_name = pipeline.steps[step_idx].__class__.__name__
    print(f"← After {step_name}")
    
    # Log statistics
    action = transition.get('action')
    if isinstance(action, torch.Tensor):
        print(f"  Action stats: mean={action.mean():.4f}, std={action.std():.4f}")

# Create pipeline with hooks
pipeline = DataProcessorPipeline(
    steps=[...],
    before_step_hooks=[before_step_hook],
    after_step_hooks=[after_step_hook]
)

Inspecting Feature Transformations

Track how data shapes and types change through the pipeline:
from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType

# Define input features
input_features = {
    PipelineFeatureType.OBSERVATION: {
        "observation.images.camera": PolicyFeature(type=FeatureType.IMAGE, shape=(3, 224, 224)),
        "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,)),
    },
    PipelineFeatureType.ACTION: {
        "action": PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
    },
}

# Transform through each step
current_features = input_features
for i, step in enumerate(pipeline.steps):
    print(f"\nStep {i}: {step.__class__.__name__}")
    
    # Get transformed features
    current_features = step.transform_features(current_features)
    
    # Print feature descriptions
    for feature_type, features in current_features.items():
        print(f"  {feature_type}:")
        for name, feature in features.items():
            print(f"    {name}: shape={feature.shape}, type={feature.type}")
Example output:
Step 0: TokenizerProcessorStep
  OBSERVATION:
    observation.language_tokens: shape=(77,), type=TEXT
    observation.language_attention_mask: shape=(77,), type=TEXT
    observation.images.camera: shape=(3, 224, 224), type=IMAGE
    observation.state: shape=(7,), type=STATE
  ACTION:
    action: shape=(7,), type=ACTION

Step 1: NormalizerProcessorStep
  OBSERVATION:
    observation.language_tokens: shape=(77,), type=TEXT
    observation.language_attention_mask: shape=(77,), type=TEXT
    observation.images.camera: shape=(3, 224, 224), type=IMAGE
    observation.state: shape=(7,), type=STATE  # Normalized
  ACTION:
    action: shape=(7,), type=ACTION  # Normalized

Debugging Normalization

Inspect normalization statistics:
from lerobot.processor import NormalizerProcessorStep

normalizer = None
for step in pipeline.steps:
    if isinstance(step, NormalizerProcessorStep):
        normalizer = step
        break

if normalizer:
    print("Normalization stats:")
    for key, stats in normalizer.stats.items():
        print(f"\n{key}:")
        if 'mean' in stats:
            print(f"  mean: {stats['mean']}")
            print(f"  std: {stats['std']}")
        if 'min' in stats:
            print(f"  min: {stats['min']}")
            print(f"  max: {stats['max']}")
    
    # Check tensor stats (on device)
    print("\nTensor stats:")
    for key, tensor_stats in normalizer._tensor_stats.items():
        print(f"\n{key}:")
        for stat_name, tensor in tensor_stats.items():
            print(f"  {stat_name}: device={tensor.device}, dtype={tensor.dtype}")

Debugging Device Placement

Verify tensors are on correct devices:
def check_device_placement(transition: dict, expected_device: str = "cuda"):
    """Recursively check all tensors are on expected device"""
    issues = []
    
    def check_recursive(obj, path=""):
        if isinstance(obj, torch.Tensor):
            if obj.device.type != expected_device:
                issues.append(f"{path}: expected {expected_device}, got {obj.device}")
        elif isinstance(obj, dict):
            for key, value in obj.items():
                check_recursive(value, f"{path}.{key}" if path else key)
    
    check_recursive(transition)
    
    if issues:
        print("❌ Device placement issues:")
        for issue in issues:
            print(f"  {issue}")
    else:
        print(f"✅ All tensors on {expected_device}")

# Use in pipeline
for i, transition in enumerate(pipeline.step_through(data)):
    if i > 0 and isinstance(pipeline.steps[i-1], DeviceProcessorStep):
        check_device_placement(transition, pipeline.steps[i-1].device)

Saving Pipeline State

Save pipeline configuration and state for debugging:
from pathlib import Path

# Save pipeline
pipeline.save_pretrained("debug_pipeline")

# Check saved files
save_dir = Path("debug_pipeline")
print("Saved files:")
for file in save_dir.iterdir():
    print(f"  {file.name}")
    if file.suffix == ".json":
        import json
        with open(file) as f:
            config = json.load(f)
        print(json.dumps(config, indent=2))

Common Issues and Solutions

Issue: NaN Values After Normalization

Cause: Division by zero when std is 0 Solution: Check normalization stats:
for key, stats in normalizer.stats.items():
    if 'std' in stats:
        std = torch.tensor(stats['std'])
        if (std == 0).any():
            print(f"⚠️ Zero std in {key}: {std}")
            print("   This will cause NaN values!")

Issue: Device Mismatch Errors

Cause: Tensors on different devices Solution: Add device processor early in pipeline:
pipeline = DataProcessorPipeline(
    steps=[
        DeviceProcessorStep(device="cuda"),  # First step!
        # ... other steps
    ]
)

Issue: Shape Mismatches

Cause: Step expecting different input shape Solution: Use transform_features() to verify:
current_features = input_features
for step in pipeline.steps:
    print(f"Before {step.__class__.__name__}:")
    print(f"  Features: {current_features}")
    
    try:
        current_features = step.transform_features(current_features)
    except Exception as e:
        print(f"❌ Error in {step.__class__.__name__}: {e}")
        break

Debugging Custom Processors

Add debug output to custom processor steps:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from dataclasses import dataclass

@ProcessorStepRegistry.register("debug_processor")
@dataclass
class DebugProcessorStep(ProcessorStep):
    verbose: bool = True
    
    def __call__(self, transition):
        if self.verbose:
            print(f"\n=== {self.__class__.__name__} ===")
            print(f"Input keys: {transition.keys()}")
            
            for key, value in transition.items():
                if isinstance(value, torch.Tensor):
                    print(f"{key}: {value.shape}, {value.dtype}, {value.device}")
                elif isinstance(value, dict):
                    print(f"{key}: dict with {len(value)} items")
                    for k, v in value.items():
                        if isinstance(v, torch.Tensor):
                            print(f"  {k}: {v.shape}, {v.dtype}, {v.device}")
        
        return transition
    
    def transform_features(self, features):
        return features  # Pass through

# Use in pipeline
pipeline = DataProcessorPipeline(
    steps=[
        DebugProcessorStep(verbose=True),
        DeviceProcessorStep(device="cuda"),
        DebugProcessorStep(verbose=True),
        NormalizerProcessorStep(stats=stats),
        DebugProcessorStep(verbose=True),
    ]
)

Performance Profiling

Measure time spent in each step:
import time

timings = []

def timing_hook(step_idx: int, transition: dict):
    if not hasattr(timing_hook, 'start_time'):
        timing_hook.start_time = time.perf_counter()
    else:
        elapsed = time.perf_counter() - timing_hook.start_time
        step_name = pipeline.steps[step_idx].__class__.__name__
        timings.append((step_name, elapsed))
        timing_hook.start_time = time.perf_counter()

pipeline = DataProcessorPipeline(
    steps=[...],
    after_step_hooks=[timing_hook]
)

# Run pipeline
output = pipeline(data)

# Print timings
print("\nPipeline timings:")
total = sum(t for _, t in timings)
for step_name, elapsed in timings:
    pct = 100 * elapsed / total
    print(f"{step_name:30s}: {elapsed*1000:6.2f}ms ({pct:5.1f}%)")
print(f"{'Total':30s}: {total*1000:6.2f}ms")

API Reference

DataProcessorPipeline

See lerobot/processor/pipeline.py:254
step_through
(data) -> Iterable[EnvTransition]
Yield intermediate states after each processing step
__call__
(data) -> Any
Process data through full pipeline
save_pretrained
(path) -> None
Save pipeline configuration and state

ProcessorStep

See lerobot/processor/pipeline.py:143
transform_features
(features) -> dict
Transform feature descriptions without processing actual data
state_dict
() -> dict
Return processor state (e.g., normalization stats)
load_state_dict
(state) -> None
Load processor state from dictionary

Build docs developers (and LLMs) love