Overview
Processors in LeRobot are modular data transformation pipelines that convert between different data representations. They handle:
- Normalization: Scaling observations and actions to standard ranges
- Device management: Moving data between CPU and GPU
- Format conversion: Converting between robot, policy, and environment formats
- Delta actions: Computing relative vs. absolute actions
- Observation processing: Renaming, cropping, and transforming sensor data
Processors enable you to train policies on normalized data while deploying them on real robots with different units and ranges.
Core Concepts
Data Types
LeRobot defines several data types for different stages:
RobotObservation = dict[str, Any] # Raw robot sensor data
RobotAction = dict[str, Any] # Raw robot motor commands
PolicyAction = torch.Tensor # Normalized policy output
EnvAction = np.ndarray # Environment-compatible actions
Source: src/lerobot/processor/core.py:39
EnvTransition
The standard format for data flowing through processors:
from lerobot.processor import EnvTransition
transition = {
"observation": {...}, # Sensor data
"action": ..., # Action to execute
"reward": 0.0, # Reward signal (optional)
"done": False, # Episode termination
"truncated": False, # Time limit reached
"info": {...}, # Auxiliary information
"complementary_data": {...}, # Additional data
}
Source: src/lerobot/processor/core.py:45
ProcessorStep
The building block of processing pipelines:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
@ProcessorStepRegistry.register()
class MyProcessorStep(ProcessorStep):
def __call__(self, transition: EnvTransition) -> EnvTransition:
# Transform the transition
transition["observation"]["value"] *= 2.0
return transition
def transform_features(self, features: dict) -> dict:
# Describe how features change
return features
Source: src/lerobot/processor/pipeline.py:143
ProcessorStep Registry
Steps are registered for serialization and sharing:
from lerobot.processor import ProcessorStepRegistry
# List all available steps
print(ProcessorStepRegistry.list())
# Get a step by name
step_class = ProcessorStepRegistry.get("NormalizerProcessorStep")
Source: src/lerobot/processor/pipeline.py:59
DataProcessorPipeline
Chain multiple steps together:
from lerobot.processor import (
DataProcessorPipeline,
NormalizerProcessorStep,
DeviceProcessorStep,
)
pipeline = DataProcessorPipeline(
steps=[
NormalizerProcessorStep(stats=dataset.meta.stats),
DeviceProcessorStep(device="cuda"),
],
name="robot_observation_processor",
)
# Process data
processed = pipeline(raw_observation)
Source: src/lerobot/processor/pipeline.py:253
Built-in Processor Steps
NormalizerProcessorStep
Normalizes observations and actions using dataset statistics:
from lerobot.processor import NormalizerProcessorStep
normalizer = NormalizerProcessorStep(
stats=dataset.meta.stats,
mode="mean_std", # Options: "mean_std", "min_max"
)
# Normalize
transition = normalizer(transition)
# transition["observation"]["state"] is now normalized
The normalizer supports different modes:
- mean_std:
(x - mean) / std
- min_max:
(x - min) / (max - min)
Source: src/lerobot/processor/normalize_processor.py
UnnormalizerProcessorStep
Reverses normalization:
from lerobot.processor import UnnormalizerProcessorStep
unnormalizer = UnnormalizerProcessorStep(
stats=dataset.meta.stats,
mode="mean_std",
)
# Unnormalize policy output back to robot units
transition = unnormalizer(transition)
Source: src/lerobot/processor/normalize_processor.py
DeviceProcessorStep
Moves tensors between devices:
from lerobot.processor import DeviceProcessorStep
to_gpu = DeviceProcessorStep(device="cuda")
to_cpu = DeviceProcessorStep(device="cpu")
transition = to_gpu(transition)
# All tensors now on GPU
Source: src/lerobot/processor/device_processor.py
VanillaObservationProcessorStep
Processes raw observations from robots:
from lerobot.processor import VanillaObservationProcessorStep
obs_processor = VanillaObservationProcessorStep()
transition = obs_processor(transition)
Source: src/lerobot/processor/observation_processor.py
RenameObservationsProcessorStep
Renames observation keys:
from lerobot.processor import RenameObservationsProcessorStep
renamer = RenameObservationsProcessorStep(
mapping={
"camera_top": "observation.images.top",
"joint_pos": "observation.state",
}
)
transition = renamer(transition)
Source: src/lerobot/processor/rename_processor.py
Delta Action Processors
Convert between absolute and relative actions:
from lerobot.processor import MapDeltaActionToRobotActionStep
delta_processor = MapDeltaActionToRobotActionStep()
# Convert delta actions to absolute robot commands
transition = delta_processor(transition)
Source: src/lerobot/processor/delta_action_processor.py
RobotProcessorPipeline
Specialized pipeline for robot control:
from lerobot.processor import RobotProcessorPipeline
robot_processor = RobotProcessorPipeline[
RobotObservation, # Input type
RobotObservation # Output type
](
steps=[...],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
processed_obs = robot_processor(raw_obs)
Source: src/lerobot/processor/pipeline.py:70
Factory Functions
Convenient functions to create default processors:
from lerobot.processor import (
make_default_robot_observation_processor,
make_default_robot_action_processor,
make_default_teleop_action_processor,
)
obs_processor = make_default_robot_observation_processor()
action_processor = make_default_robot_action_processor()
teleop_processor = make_default_teleop_action_processor()
Source: src/lerobot/processor/factory.py:27
Example: Complete Pipeline
Here’s a full example of processing observations for a policy:
from lerobot.processor import (
DataProcessorPipeline,
RenameObservationsProcessorStep,
NormalizerProcessorStep,
DeviceProcessorStep,
)
from lerobot.datasets import LeRobotDataset
# Load dataset for statistics
dataset = LeRobotDataset(repo_id="lerobot/pusht")
# Create processing pipeline
observation_pipeline = DataProcessorPipeline(
steps=[
# Step 1: Rename raw robot observations
RenameObservationsProcessorStep({
"camera_top": "observation.images.top",
"joint_pos": "observation.state",
}),
# Step 2: Normalize using dataset statistics
NormalizerProcessorStep(
stats=dataset.meta.stats,
mode="mean_std",
),
# Step 3: Move to GPU
DeviceProcessorStep(device="cuda"),
],
name="observation_processor",
)
# Use in inference
raw_obs = robot.get_observation()
processed_obs = observation_pipeline(raw_obs)
action = policy.select_action(processed_obs)
Example: Action Processing
Process policy outputs back to robot commands:
from lerobot.processor import (
DataProcessorPipeline,
DeviceProcessorStep,
UnnormalizerProcessorStep,
)
action_pipeline = DataProcessorPipeline(
steps=[
# Step 1: Move to CPU
DeviceProcessorStep(device="cpu"),
# Step 2: Unnormalize to robot units
UnnormalizerProcessorStep(
stats=dataset.meta.stats,
mode="mean_std",
),
],
name="action_processor",
)
# Process policy output
policy_action = policy.select_action(observation)
robot_action = action_pipeline(policy_action)
robot.send_action(robot_action)
Stateful Processors
Some processors maintain internal state:
class StatefulProcessor(ProcessorStep):
def __init__(self):
self.count = 0
def __call__(self, transition):
self.count += 1
return transition
def state_dict(self) -> dict[str, torch.Tensor]:
return {"count": torch.tensor(self.count)}
def load_state_dict(self, state: dict):
self.count = state["count"].item()
def reset(self):
self.count = 0
Source: src/lerobot/processor/pipeline.py:192
Saving and Loading Pipelines
Save to Disk
pipeline.save_pretrained("path/to/processor")
This saves:
config.json: Step configurations
state.safetensors: Step states (e.g., normalization statistics)
Load from Disk
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline.from_pretrained("path/to/processor")
Push to Hub
pipeline.push_to_hub("username/my_processor")
Load from Hub
pipeline = DataProcessorPipeline.from_pretrained("username/my_processor")
Hooks
Add debugging or logging hooks:
def before_hook(step_idx: int, transition: EnvTransition):
print(f"Before step {step_idx}")
def after_hook(step_idx: int, transition: EnvTransition):
print(f"After step {step_idx}: {transition.keys()}")
pipeline = DataProcessorPipeline(
steps=[...],
before_step_hooks=[before_hook],
after_step_hooks=[after_hook],
)
Source: src/lerobot/processor/pipeline.py:281
Integration with Policies
Policies can include processing pipelines:
from lerobot.policies.diffusion import DiffusionPolicy
policy = DiffusionPolicy.from_pretrained("username/my_policy")
# Policy includes its own processor
if hasattr(policy, "processor"):
processed = policy.processor(observation)
action = policy.select_action(processed)
Best Practices
Save statistics: Always save normalization statistics with your policy so you can unnormalize outputs correctly during deployment.
Order matters: The order of processing steps is important. Typically: rename → normalize → device transfer.
Batch processing: Processors work on single transitions. Use DataLoader for batch processing during training.
Advanced: Custom Processor Steps
Create custom transformation steps:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
import torch
@ProcessorStepRegistry.register("ClipActions")
class ClipActionsStep(ProcessorStep):
def __init__(self, min_val: float = -1.0, max_val: float = 1.0):
self.min_val = min_val
self.max_val = max_val
def __call__(self, transition: EnvTransition) -> EnvTransition:
if "action" in transition:
transition["action"] = torch.clamp(
transition["action"],
self.min_val,
self.max_val,
)
return transition
def transform_features(self, features: dict) -> dict:
# Features don't change shape/type
return features
def get_config(self) -> dict:
return {
"min_val": self.min_val,
"max_val": self.max_val,
}
# Use in pipeline
pipeline = DataProcessorPipeline(
steps=[
ClipActionsStep(min_val=-2.0, max_val=2.0),
]
)
Next Steps