The Processor API provides modular pipelines for transforming data between robots, policies, and environments.
Overview
LeRobot uses processor pipelines to:
- Normalize/unnormalize data
- Convert between data formats (numpy ↔ torch)
- Apply transformations (e.g., delta actions)
- Move data between devices (CPU/GPU)
- Rename observation keys
- Add batch dimensions
Type Definitions
from lerobot.processor import (
RobotObservation, # Dict of robot sensor data
RobotAction, # Dict of robot commands
PolicyAction, # Tensor or dict from policy
EnvAction, # Numpy array for environment
)
Location: src/lerobot/processor/core.py
Core Pipeline Classes
ProcessorStep
Base class for all processing steps.
from lerobot.processor import ProcessorStep
class ProcessorStep:
def __call__(self, data: Any) -> Any:
"""Transform input data."""
pass
DataProcessorPipeline
Generic pipeline for chaining processing steps.
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline([
NormalizerProcessorStep(stats),
DeviceProcessorStep(device="cuda"),
])
processed_data = pipeline(raw_data)
Built-in Processor Steps
NormalizerProcessorStep
Normalize data using dataset statistics.
from lerobot.processor import NormalizerProcessorStep
normalizer = NormalizerProcessorStep(
stats=dataset.meta.stats,
features={
"observation.state": {...},
"action": {...},
},
)
normalized = normalizer(data)
Dataset statistics with mean and std for each feature.
Feature definitions specifying which keys to normalize.
UnnormalizerProcessorStep
Reverse normalization.
from lerobot.processor import UnnormalizerProcessorStep
unnormalizer = UnnormalizerProcessorStep(
stats=dataset.meta.stats,
features={"action": {...}},
)
action = unnormalizer(normalized_action)
DeviceProcessorStep
Move tensors to specific device.
from lerobot.processor import DeviceProcessorStep
to_cuda = DeviceProcessorStep(device="cuda")
data_on_gpu = to_cuda(data_on_cpu)
Device string: "cpu", "cuda", "cuda:0", etc.
MapDeltaActionToRobotActionStep
Convert delta actions to absolute actions.
from lerobot.processor import MapDeltaActionToRobotActionStep
delta_to_absolute = MapDeltaActionToRobotActionStep()
# Input: (delta_action, current_observation)
absolute_action = delta_to_absolute((delta_action, robot_obs))
RenameObservationsProcessorStep
Rename observation keys.
from lerobot.processor import RenameObservationsProcessorStep
renamer = RenameObservationsProcessorStep(
rename_map={
"observation.images.camera1": "observation.images.front",
}
)
renamed_obs = renamer(observation)
Mapping from old keys to new keys.
AddBatchDimensionProcessorStep
Add batch dimension to tensors.
from lerobot.processor import AddBatchDimensionProcessorStep
add_batch = AddBatchDimensionProcessorStep()
# (C, H, W) -> (1, C, H, W)
batched = add_batch(unbatched_data)
Factory Functions
make_default_processors
from lerobot.processor import make_default_processors
robot_obs_processor, robot_action_processor, teleop_action_processor = make_default_processors()
Create default processors for robot data.
Pipeline for processing robot observations.
Pipeline for processing robot actions.
Pipeline for processing teleoperation actions.
make_default_robot_observation_processor
from lerobot.processor import make_default_robot_observation_processor
obs_processor = make_default_robot_observation_processor()
processed_obs = obs_processor(robot.get_observation())
make_default_robot_action_processor
from lerobot.processor import make_default_robot_action_processor
action_processor = make_default_robot_action_processor()
processed_action = action_processor((action, observation))
Specialized Pipelines
PolicyProcessorPipeline
Pipeline for policy input/output processing.
from lerobot.processor import PolicyProcessorPipeline
from lerobot.policies.factory import make_pre_post_processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=config,
dataset_stats=dataset.meta.stats,
)
# Preprocessing pipeline
processed_obs = preprocessor(observation)
# Postprocessing pipeline
action = policy.select_action(processed_obs)
final_action = postprocessor(action)
RobotProcessorPipeline
Pipeline for robot-specific processing.
from lerobot.processor import RobotProcessorPipeline
robot_pipeline = RobotProcessorPipeline([
# Custom steps for robot
])
processed = robot_pipeline(robot_data)
Usage Examples
Training Pipeline
from lerobot.datasets import LeRobotDataset
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from torch.utils.data import DataLoader
# Load dataset
dataset = LeRobotDataset("lerobot/pusht")
# Create policy
policy = make_policy(config, dataset.meta)
# Create preprocessor
preprocessor, _ = make_pre_post_processors(
policy_cfg=config,
dataset_stats=dataset.meta.stats,
)
# Setup dataloader
dataloader = DataLoader(dataset, batch_size=32)
# Training loop
for batch in dataloader:
# Preprocess batch
batch = preprocessor(batch)
# Forward pass
loss, info = policy.forward(batch)
loss.backward()
optimizer.step()
Inference Pipeline
from lerobot.robots import make_robot_from_config
from lerobot.policies import make_policy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_robot_action_processor
# Load components
robot = make_robot_from_config(robot_config)
policy = make_policy(cfg=None, pretrained_path="lerobot/diffusion_pusht")
# Create processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy.config,
pretrained_path="lerobot/diffusion_pusht",
)
robot_action_processor = make_default_robot_action_processor()
# Inference loop
robot.connect()
policy.reset()
for step in range(100):
# Get observation
observation = robot.get_observation()
# Policy inference
observation = preprocessor(observation)
action = policy.select_action(observation)
action = postprocessor(action)
# Process for robot
robot_action = robot_action_processor((action, observation))
# Send to robot
robot.send_action(robot_action)
robot.disconnect()
Custom Processor Step
from lerobot.processor import ProcessorStep
import torch
class CustomProcessorStep(ProcessorStep):
def __init__(self, scale: float):
self.scale = scale
def __call__(self, data: dict) -> dict:
# Scale all action values
if "action" in data:
data["action"] = data["action"] * self.scale
return data
# Use in pipeline
from lerobot.processor import DataProcessorPipeline
pipeline = DataProcessorPipeline([
CustomProcessorStep(scale=2.0),
DeviceProcessorStep(device="cuda"),
])
processed = pipeline(data)
Delta Action Processing
from lerobot.processor import (
MapTensorToDeltaActionDictStep,
MapDeltaActionToRobotActionStep,
)
# Convert policy tensor output to delta action dict
tensor_to_delta = MapTensorToDeltaActionDictStep(
action_names=["x", "y", "z", "gripper"]
)
# Convert delta action to absolute action
delta_to_absolute = MapDeltaActionToRobotActionStep()
# Pipeline
policy_output = policy.select_action(observation) # Tensor
delta_action = tensor_to_delta(policy_output) # Dict
robot_obs = robot.get_observation()
absolute_action = delta_to_absolute((delta_action, robot_obs))
Observation Renaming
from lerobot.processor import RenameObservationsProcessorStep
# Robot has different camera names than policy expects
renamer = RenameObservationsProcessorStep(
rename_map={
"observation.images.webcam": "observation.images.top",
"observation.joint_pos": "observation.state",
}
)
robot_obs = robot.get_observation()
renamed_obs = renamer(robot_obs)
# Now compatible with policy trained on different names
Processor Registry
Register custom processor steps:
from lerobot.processor import ProcessorStepRegistry
registry = ProcessorStepRegistry()
@registry.register("my_custom_step")
class MyCustomStep(ProcessorStep):
def __call__(self, data):
return data
# Use in configuration
step = registry.create("my_custom_step", config={})
See Also