Processor Step Basics
All processor steps inherit fromProcessorStep and must implement two methods:
__call__(transition): Transform the datatransform_features(features): Describe how features change
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
from lerobot.processor.core import EnvTransition
from lerobot.configs.types import PipelineFeatureType, PolicyFeature
from dataclasses import dataclass
@ProcessorStepRegistry.register("my_custom_processor")
@dataclass
class MyCustomProcessorStep(ProcessorStep):
# Configuration parameters
scale_factor: float = 1.0
def __call__(self, transition: EnvTransition) -> EnvTransition:
"""Process the transition data"""
# Your processing logic here
return transition
def transform_features(
self,
features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""Describe how features are transformed"""
# Return modified feature descriptions
return features
lerobot/processor/pipeline.py:143 for the base ProcessorStep class.
Specialized Processor Steps
LeRobot provides specialized base classes for common use cases:ObservationProcessorStep
Process only observations:from lerobot.processor import ObservationProcessorStep
@ProcessorStepRegistry.register("my_obs_processor")
@dataclass
class MyObsProcessorStep(ObservationProcessorStep):
def observation(self, observation: dict) -> dict:
"""Process observation data"""
processed = observation.copy()
# Example: Add a new observation field
if "observation.state" in processed:
state = processed["observation.state"]
# Add velocity by finite difference
processed["observation.velocity"] = compute_velocity(state)
return processed
def transform_features(self, features):
# Add velocity feature
if FeatureType.STATE in features:
features[FeatureType.STATE]["observation.velocity"] = PolicyFeature(
type=FeatureType.STATE,
shape=(7,) # Same dim as state
)
return features
ActionProcessorStep
Process only actions:from lerobot.processor import ActionProcessorStep
@ProcessorStepRegistry.register("my_action_processor")
@dataclass
class MyActionProcessorStep(ActionProcessorStep):
def action(self, action: torch.Tensor) -> torch.Tensor:
"""Process action tensor"""
# Example: Clip actions to safe range
return torch.clamp(action, -1.0, 1.0)
def transform_features(self, features):
# Features unchanged (still same shape)
return features
RobotActionProcessorStep
Process robot action dictionaries:from lerobot.processor import RobotActionProcessorStep
@ProcessorStepRegistry.register("my_robot_action_processor")
@dataclass
class MyRobotActionProcessorStep(RobotActionProcessorStep):
max_velocity: float = 1.0
def action(self, action: dict[str, float]) -> dict[str, float]:
"""Process robot action dict"""
processed = action.copy()
# Example: Limit joint velocities
for key in action.keys():
if key.endswith(".vel"):
processed[key] = np.clip(action[key], -self.max_velocity, self.max_velocity)
return processed
def transform_features(self, features):
return features
Example: Image Preprocessing Step
Create a step that crops and resizes images:import torch
import torchvision.transforms.functional as TF
from lerobot.processor import ObservationProcessorStep
from lerobot.utils.constants import OBS_IMAGES
@ProcessorStepRegistry.register("crop_resize_processor")
@dataclass
class CropResizeProcessorStep(ObservationProcessorStep):
crop_box: tuple[int, int, int, int] # (x, y, width, height)
output_size: tuple[int, int] # (height, width)
def observation(self, observation: dict) -> dict:
processed = observation.copy()
# Process all image observations
for key in list(processed.keys()):
if key.startswith(f"{OBS_IMAGES}."):
img = processed[key] # Shape: (B, C, H, W)
# Crop
x, y, w, h = self.crop_box
img = img[:, :, y:y+h, x:x+w]
# Resize
img = TF.resize(img, self.output_size)
processed[key] = img
return processed
def transform_features(self, features):
# Update image feature shapes
new_features = features.copy()
for feature_type, feats in features.items():
if feature_type == FeatureType.IMAGE:
for name, feature in feats.items():
if name.startswith(f"{OBS_IMAGES}."):
# Update shape to output size
new_features[feature_type][name] = PolicyFeature(
type=FeatureType.IMAGE,
shape=(3, *self.output_size) # (C, H, W)
)
return new_features
# Usage
processor = CropResizeProcessorStep(
crop_box=(100, 100, 400, 400),
output_size=(224, 224)
)
Example: Delta Action Converter
Convert absolute actions to delta (relative) actions:from lerobot.processor import ActionProcessorStep
import torch
@ProcessorStepRegistry.register("absolute_to_delta_action")
@dataclass
class AbsoluteToDeltaActionStep(ActionProcessorStep):
"""Convert absolute actions to delta actions."""
_prev_action: torch.Tensor | None = None
def action(self, action: torch.Tensor) -> torch.Tensor:
"""Convert absolute action to delta."""
if self._prev_action is None:
# First action: delta is just the action itself
delta = action.clone()
else:
# Compute delta from previous action
delta = action - self._prev_action
# Store for next call
self._prev_action = action.clone()
return delta
def reset(self):
"""Reset internal state."""
self._prev_action = None
def transform_features(self, features):
# Features unchanged (still same shape)
return features
# Note: Call reset() at episode boundaries
processor = AbsoluteToDeltaActionStep()
for episode in episodes:
processor.reset()
for transition in episode:
processed = processor(transition)
Stateful Processors
Processors can maintain state usingstate_dict() and load_state_dict():
from lerobot.processor import ProcessorStep
import torch
@ProcessorStepRegistry.register("running_mean_processor")
@dataclass
class RunningMeanProcessorStep(ProcessorStep):
"""Maintain running mean of observations."""
# State
_count: int = 0
_mean: torch.Tensor | None = None
def __call__(self, transition):
obs = transition["observation"]
# Update running mean
if "observation.state" in obs:
state = obs["observation.state"]
if self._mean is None:
self._mean = state.clone()
self._count = 1
else:
self._count += 1
self._mean = self._mean + (state - self._mean) / self._count
# Subtract running mean
obs["observation.state"] = state - self._mean
return transition
def state_dict(self) -> dict[str, torch.Tensor]:
"""Save state."""
if self._mean is None:
return {}
return {
"mean": self._mean,
"count": torch.tensor(self._count),
}
def load_state_dict(self, state: dict[str, torch.Tensor]):
"""Load state."""
if "mean" in state:
self._mean = state["mean"]
self._count = int(state["count"].item())
def transform_features(self, features):
return features
Accessing Other Transition Components
Processors can access other parts of the transition viaself.transition:
from lerobot.processor import RobotActionProcessorStep
from lerobot.processor.core import TransitionKey
@ProcessorStepRegistry.register("action_based_on_obs")
@dataclass
class ActionBasedOnObsStep(RobotActionProcessorStep):
"""Modify action based on observation."""
def action(self, action: dict) -> dict:
# Access observation from current transition
observation = self.transition.get(TransitionKey.OBSERVATION)
if observation and "force_sensor" in observation:
force = observation["force_sensor"]
# Reduce action if force is too high
if force > 10.0:
scale = 0.5
action = {k: v * scale for k, v in action.items()}
return action
def transform_features(self, features):
return features
lerobot/processor/pipeline.py:156 for the transition property.
Configuration Parameters
Use dataclass fields for configuration:from dataclasses import dataclass, field
@ProcessorStepRegistry.register("configurable_processor")
@dataclass
class ConfigurableProcessorStep(ProcessorStep):
# Required parameter
scale: float
# Optional with default
offset: float = 0.0
# Complex default (use field)
feature_names: list[str] = field(default_factory=lambda: ["state"])
# Internal state (not serialized)
_cache: dict = field(default_factory=dict, init=False, repr=False)
def __call__(self, transition):
# Use configuration
for name in self.feature_names:
if name in transition["observation"]:
transition["observation"][name] = (
transition["observation"][name] * self.scale + self.offset
)
return transition
def get_config(self) -> dict:
"""Return serializable config."""
return {
"scale": self.scale,
"offset": self.offset,
"feature_names": self.feature_names,
}
def transform_features(self, features):
return features
Registration and Discovery
Register your processor to make it discoverable:from lerobot.processor import ProcessorStepRegistry
# Option 1: Decorator
@ProcessorStepRegistry.register("my_processor")
class MyProcessorStep(ProcessorStep):
...
# Option 2: Explicit registration
class MyProcessorStep(ProcessorStep):
...
ProcessorStepRegistry.register("my_processor")(MyProcessorStep)
# Discovery
all_processors = ProcessorStepRegistry.list()
print(all_processors) # [..., 'my_processor', ...]
# Instantiation by name
processor_class = ProcessorStepRegistry.get("my_processor")
processor = processor_class(scale=2.0)
Testing Custom Processors
Always test your processors thoroughly:import pytest
import torch
from lerobot.processor import create_transition
def test_my_processor():
# Create test data
transition = create_transition(
observation={"observation.state": torch.randn(7)},
action=torch.randn(7),
)
# Create processor
processor = MyProcessorStep(scale=2.0)
# Process
output = processor(transition)
# Verify
assert "observation.state" in output["observation"]
# Check transform_features
from lerobot.configs.types import FeatureType, PolicyFeature, PipelineFeatureType
input_features = {
PipelineFeatureType.OBSERVATION: {
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(7,))
}
}
output_features = processor.transform_features(input_features)
assert "observation.state" in output_features[PipelineFeatureType.OBSERVATION]
def test_stateful_processor():
processor = RunningMeanProcessorStep()
# Process multiple transitions
for i in range(10):
transition = create_transition(
observation={"observation.state": torch.randn(7)}
)
processor(transition)
# Save state
state = processor.state_dict()
assert "mean" in state
# Create new processor and load state
new_processor = RunningMeanProcessorStep()
new_processor.load_state_dict(state)
# Verify state transferred
assert torch.allclose(processor._mean, new_processor._mean)
Integration with Pipelines
Use your custom processor in a pipeline:from lerobot.processor import DataProcessorPipeline, DeviceProcessorStep
pipeline = DataProcessorPipeline(
steps=[
MyCustomProcessorStep(scale=2.0),
DeviceProcessorStep(device="cuda"),
# ... other steps
],
name="custom_pipeline"
)
# Save pipeline with custom processor
pipeline.save_pretrained("my_pipeline")
# Load pipeline (requires custom processor to be registered)
loaded = DataProcessorPipeline.from_pretrained("my_pipeline")
Best Practices
1. Keep Processors Simple
Each processor should do one thing well:# Good: Single responsibility
@ProcessorStepRegistry.register("crop_image")
class CropImageStep(ObservationProcessorStep):
def observation(self, obs):
# Only crop
...
# Bad: Multiple responsibilities
@ProcessorStepRegistry.register("process_all")
class ProcessAllStep(ProcessorStep):
def __call__(self, transition):
# Crop, resize, normalize, augment...
...
2. Handle Edge Cases
def observation(self, observation):
processed = observation.copy()
# Check if expected fields exist
if "observation.state" not in processed:
return processed # Skip processing
state = processed["observation.state"]
# Check tensor properties
if state.ndim != 1:
raise ValueError(f"Expected 1D state, got {state.ndim}D")
# Handle empty batches
if state.shape[0] == 0:
return processed
# Your processing...
return processed
3. Document Behavior
@ProcessorStepRegistry.register("my_processor")
@dataclass
class MyProcessorStep(ObservationProcessorStep):
"""Process observations by doing X.
This processor expects:
- observation.state: (N,) tensor of joint positions
- observation.images.camera: (B, C, H, W) image tensor
It produces:
- observation.state: (N,) tensor (unchanged)
- observation.images.camera: (B, C, H', W') resized image
- observation.velocity: (N,) tensor of joint velocities
Args:
output_size: Target image size (H, W)
Example:
>>> processor = MyProcessorStep(output_size=(224, 224))
>>> output = processor(transition)
"""
output_size: tuple[int, int]
API Reference
ProcessorStep
Seelerobot/processor/pipeline.py:143
Process environment transition
Transform feature descriptions
Return serializable state
Load state from dictionary
Reset internal state (for episodic processing)
ProcessorStepRegistry
Seelerobot/processor/pipeline.py:59
Register a processor step class
Get processor class by name
List all registered processor names