Skip to main content
LeRobot’s processor system is designed to be extensible. This guide shows you how to create custom processing steps for your specific robot or task.

Processor Step Basics

All processor steps inherit from ProcessorStep and must implement two methods:
  1. __call__(transition): Transform the data
  2. transform_features(features): Describe how features change
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from lerobot.processor.core import EnvTransition
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from dataclasses import dataclass

@ProcessorStepRegistry.register("my_custom_processor")
@dataclass
class MyCustomProcessorStep(ProcessorStep):
    # Configuration parameters
    scale_factor: float = 1.0
    
    def __call__(self, transition: EnvTransition) -> EnvTransition:
        """Process the transition data"""
        # Your processing logic here
        return transition
    
    def transform_features(
        self,
        features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
    ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
        """Describe how features are transformed"""
        # Return modified feature descriptions
        return features
See lerobot/processor/pipeline.py:143 for the base ProcessorStep class.

Specialized Processor Steps

LeRobot provides specialized base classes for common use cases:

ObservationProcessorStep

Process only observations:
from lerobot.processor import ObservationProcessorStep

@ProcessorStepRegistry.register("my_obs_processor")
@dataclass
class MyObsProcessorStep(ObservationProcessorStep):
    def observation(self, observation: dict) -> dict:
        """Process observation data"""
        processed = observation.copy()
        
        # Example: Add a new observation field
        if "observation.state" in processed:
            state = processed["observation.state"]
            # Add velocity by finite difference
            processed["observation.velocity"] = compute_velocity(state)
        
        return processed
    
    def transform_features(self, features):
        # Add velocity feature
        if FeatureType.STATE in features:
            features[FeatureType.STATE]["observation.velocity"] = PolicyFeature(
                type=FeatureType.STATE,
                shape=(7,)  # Same dim as state
            )
        return features

ActionProcessorStep

Process only actions:
from lerobot.processor import ActionProcessorStep

@ProcessorStepRegistry.register("my_action_processor")
@dataclass
class MyActionProcessorStep(ActionProcessorStep):
    def action(self, action: torch.Tensor) -> torch.Tensor:
        """Process action tensor"""
        # Example: Clip actions to safe range
        return torch.clamp(action, -1.0, 1.0)
    
    def transform_features(self, features):
        # Features unchanged (still same shape)
        return features

RobotActionProcessorStep

Process robot action dictionaries:
from lerobot.processor import RobotActionProcessorStep

@ProcessorStepRegistry.register("my_robot_action_processor")
@dataclass
class MyRobotActionProcessorStep(RobotActionProcessorStep):
    max_velocity: float = 1.0
    
    def action(self, action: dict[str, float]) -> dict[str, float]:
        """Process robot action dict"""
        processed = action.copy()
        
        # Example: Limit joint velocities
        for key in action.keys():
            if key.endswith(".vel"):
                processed[key] = np.clip(action[key], -self.max_velocity, self.max_velocity)
        
        return processed
    
    def transform_features(self, features):
        return features

Example: Image Preprocessing Step

Create a step that crops and resizes images:
import torch
import torchvision.transforms.functional as TF
from lerobot.processor import ObservationProcessorStep
from lerobot.utils.constants import OBS_IMAGES

@ProcessorStepRegistry.register("crop_resize_processor")
@dataclass
class CropResizeProcessorStep(ObservationProcessorStep):
    crop_box: tuple[int, int, int, int]  # (x, y, width, height)
    output_size: tuple[int, int]  # (height, width)
    
    def observation(self, observation: dict) -> dict:
        processed = observation.copy()
        
        # Process all image observations
        for key in list(processed.keys()):
            if key.startswith(f"{OBS_IMAGES}."):
                img = processed[key]  # Shape: (B, C, H, W)
                
                # Crop
                x, y, w, h = self.crop_box
                img = img[:, :, y:y+h, x:x+w]
                
                # Resize
                img = TF.resize(img, self.output_size)
                
                processed[key] = img
        
        return processed
    
    def transform_features(self, features):
        # Update image feature shapes
        new_features = features.copy()
        
        for feature_type, feats in features.items():
            if feature_type == FeatureType.IMAGE:
                for name, feature in feats.items():
                    if name.startswith(f"{OBS_IMAGES}."):
                        # Update shape to output size
                        new_features[feature_type][name] = PolicyFeature(
                            type=FeatureType.IMAGE,
                            shape=(3, *self.output_size)  # (C, H, W)
                        )
        
        return new_features

# Usage
processor = CropResizeProcessorStep(
    crop_box=(100, 100, 400, 400),
    output_size=(224, 224)
)

Example: Delta Action Converter

Convert absolute actions to delta (relative) actions:
from lerobot.processor import ActionProcessorStep
import torch

@ProcessorStepRegistry.register("absolute_to_delta_action")
@dataclass
class AbsoluteToDeltaActionStep(ActionProcessorStep):
    """Convert absolute actions to delta actions."""
    
    _prev_action: torch.Tensor | None = None
    
    def action(self, action: torch.Tensor) -> torch.Tensor:
        """Convert absolute action to delta."""
        if self._prev_action is None:
            # First action: delta is just the action itself
            delta = action.clone()
        else:
            # Compute delta from previous action
            delta = action - self._prev_action
        
        # Store for next call
        self._prev_action = action.clone()
        
        return delta
    
    def reset(self):
        """Reset internal state."""
        self._prev_action = None
    
    def transform_features(self, features):
        # Features unchanged (still same shape)
        return features

# Note: Call reset() at episode boundaries
processor = AbsoluteToDeltaActionStep()
for episode in episodes:
    processor.reset()
    for transition in episode:
        processed = processor(transition)

Stateful Processors

Processors can maintain state using state_dict() and load_state_dict():
from lerobot.processor import ProcessorStep
import torch

@ProcessorStepRegistry.register("running_mean_processor")
@dataclass
class RunningMeanProcessorStep(ProcessorStep):
    """Maintain running mean of observations."""
    
    # State
    _count: int = 0
    _mean: torch.Tensor | None = None
    
    def __call__(self, transition):
        obs = transition["observation"]
        
        # Update running mean
        if "observation.state" in obs:
            state = obs["observation.state"]
            
            if self._mean is None:
                self._mean = state.clone()
                self._count = 1
            else:
                self._count += 1
                self._mean = self._mean + (state - self._mean) / self._count
            
            # Subtract running mean
            obs["observation.state"] = state - self._mean
        
        return transition
    
    def state_dict(self) -> dict[str, torch.Tensor]:
        """Save state."""
        if self._mean is None:
            return {}
        return {
            "mean": self._mean,
            "count": torch.tensor(self._count),
        }
    
    def load_state_dict(self, state: dict[str, torch.Tensor]):
        """Load state."""
        if "mean" in state:
            self._mean = state["mean"]
            self._count = int(state["count"].item())
    
    def transform_features(self, features):
        return features

Accessing Other Transition Components

Processors can access other parts of the transition via self.transition:
from lerobot.processor import RobotActionProcessorStep
from lerobot.processor.core import TransitionKey

@ProcessorStepRegistry.register("action_based_on_obs")
@dataclass  
class ActionBasedOnObsStep(RobotActionProcessorStep):
    """Modify action based on observation."""
    
    def action(self, action: dict) -> dict:
        # Access observation from current transition
        observation = self.transition.get(TransitionKey.OBSERVATION)
        
        if observation and "force_sensor" in observation:
            force = observation["force_sensor"]
            
            # Reduce action if force is too high
            if force > 10.0:
                scale = 0.5
                action = {k: v * scale for k, v in action.items()}
        
        return action
    
    def transform_features(self, features):
        return features
See lerobot/processor/pipeline.py:156 for the transition property.

Configuration Parameters

Use dataclass fields for configuration:
from dataclasses import dataclass, field

@ProcessorStepRegistry.register("configurable_processor")
@dataclass
class ConfigurableProcessorStep(ProcessorStep):
    # Required parameter
    scale: float
    
    # Optional with default
    offset: float = 0.0
    
    # Complex default (use field)
    feature_names: list[str] = field(default_factory=lambda: ["state"])
    
    # Internal state (not serialized)
    _cache: dict = field(default_factory=dict, init=False, repr=False)
    
    def __call__(self, transition):
        # Use configuration
        for name in self.feature_names:
            if name in transition["observation"]:
                transition["observation"][name] = (
                    transition["observation"][name] * self.scale + self.offset
                )
        return transition
    
    def get_config(self) -> dict:
        """Return serializable config."""
        return {
            "scale": self.scale,
            "offset": self.offset,
            "feature_names": self.feature_names,
        }
    
    def transform_features(self, features):
        return features

Registration and Discovery

Register your processor to make it discoverable:
from lerobot.processor import ProcessorStepRegistry

# Option 1: Decorator
@ProcessorStepRegistry.register("my_processor")
class MyProcessorStep(ProcessorStep):
    ...

# Option 2: Explicit registration
class MyProcessorStep(ProcessorStep):
    ...

ProcessorStepRegistry.register("my_processor")(MyProcessorStep)

# Discovery
all_processors = ProcessorStepRegistry.list()
print(all_processors)  # [..., 'my_processor', ...]

# Instantiation by name
processor_class = ProcessorStepRegistry.get("my_processor")
processor = processor_class(scale=2.0)

Testing Custom Processors

Always test your processors thoroughly:
import pytest
import torch
from lerobot.processor import create_transition

def test_my_processor():
    # Create test data
    transition = create_transition(
        observation={"observation.state": torch.randn(7)},
        action=torch.randn(7),
    )
    
    # Create processor
    processor = MyProcessorStep(scale=2.0)
    
    # Process
    output = processor(transition)
    
    # Verify
    assert "observation.state" in output["observation"]
    
    # Check transform_features
    from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType
    
    input_features = {
        PipelineFeatureType.OBSERVATION: {
            "observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,))
        }
    }
    
    output_features = processor.transform_features(input_features)
    assert "observation.state" in output_features[PipelineFeatureType.OBSERVATION]

def test_stateful_processor():
    processor = RunningMeanProcessorStep()
    
    # Process multiple transitions
    for i in range(10):
        transition = create_transition(
            observation={"observation.state": torch.randn(7)}
        )
        processor(transition)
    
    # Save state
    state = processor.state_dict()
    assert "mean" in state
    
    # Create new processor and load state
    new_processor = RunningMeanProcessorStep()
    new_processor.load_state_dict(state)
    
    # Verify state transferred
    assert torch.allclose(processor._mean, new_processor._mean)

Integration with Pipelines

Use your custom processor in a pipeline:
from lerobot.processor import DataProcessorPipeline, DeviceProcessorStep

pipeline = DataProcessorPipeline(
    steps=[
        MyCustomProcessorStep(scale=2.0),
        DeviceProcessorStep(device="cuda"),
        # ... other steps
    ],
    name="custom_pipeline"
)

# Save pipeline with custom processor
pipeline.save_pretrained("my_pipeline")

# Load pipeline (requires custom processor to be registered)
loaded = DataProcessorPipeline.from_pretrained("my_pipeline")

Best Practices

1. Keep Processors Simple

Each processor should do one thing well:
# Good: Single responsibility
@ProcessorStepRegistry.register("crop_image")
class CropImageStep(ObservationProcessorStep):
    def observation(self, obs):
        # Only crop
        ...

# Bad: Multiple responsibilities
@ProcessorStepRegistry.register("process_all")
class ProcessAllStep(ProcessorStep):
    def __call__(self, transition):
        # Crop, resize, normalize, augment...
        ...

2. Handle Edge Cases

def observation(self, observation):
    processed = observation.copy()
    
    # Check if expected fields exist
    if "observation.state" not in processed:
        return processed  # Skip processing
    
    state = processed["observation.state"]
    
    # Check tensor properties
    if state.ndim != 1:
        raise ValueError(f"Expected 1D state, got {state.ndim}D")
    
    # Handle empty batches
    if state.shape[0] == 0:
        return processed
    
    # Your processing...
    return processed

3. Document Behavior

@ProcessorStepRegistry.register("my_processor")
@dataclass
class MyProcessorStep(ObservationProcessorStep):
    """Process observations by doing X.
    
    This processor expects:
    - observation.state: (N,) tensor of joint positions
    - observation.images.camera: (B, C, H, W) image tensor
    
    It produces:
    - observation.state: (N,) tensor (unchanged)
    - observation.images.camera: (B, C, H', W') resized image
    - observation.velocity: (N,) tensor of joint velocities
    
    Args:
        output_size: Target image size (H, W)
    
    Example:
        >>> processor = MyProcessorStep(output_size=(224, 224))
        >>> output = processor(transition)
    """
    output_size: tuple[int, int]

API Reference

ProcessorStep

See lerobot/processor/pipeline.py:143
__call__
(transition) -> EnvTransition
Process environment transition
transform_features
(features) -> dict
Transform feature descriptions
state_dict
() -> dict
Return serializable state
load_state_dict
(state) -> None
Load state from dictionary
reset
() -> None
Reset internal state (for episodic processing)

ProcessorStepRegistry

See lerobot/processor/pipeline.py:59
register
(name) -> decorator
Register a processor step class
get
(name) -> type
Get processor class by name
list
() -> list[str]
List all registered processor names

Build docs developers (and LLMs) love