Skip to main content

Overview

Processors in LeRobot are modular data transformation pipelines that convert between different data representations. They handle:
  • Normalization: Scaling observations and actions to standard ranges
  • Device management: Moving data between CPU and GPU
  • Format conversion: Converting between robot, policy, and environment formats
  • Delta actions: Computing relative vs. absolute actions
  • Observation processing: Renaming, cropping, and transforming sensor data
Processors enable you to train policies on normalized data while deploying them on real robots with different units and ranges.

Core Concepts

Data Types

LeRobot defines several data types for different stages:
RobotObservation = dict[str, Any]      # Raw robot sensor data
RobotAction = dict[str, Any]           # Raw robot motor commands
PolicyAction = torch.Tensor            # Normalized policy output
EnvAction = np.ndarray                 # Environment-compatible actions
Source: src/lerobot/processor/core.py:39

EnvTransition

The standard format for data flowing through processors:
from lerobot.processor import EnvTransition

transition = {
    "observation": {...},         # Sensor data
    "action": ...,                # Action to execute
    "reward": 0.0,                # Reward signal (optional)
    "done": False,                # Episode termination
    "truncated": False,           # Time limit reached
    "info": {...},                # Auxiliary information
    "complementary_data": {...},  # Additional data
}
Source: src/lerobot/processor/core.py:45

ProcessorStep

The building block of processing pipelines:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry

@ProcessorStepRegistry.register()
class MyProcessorStep(ProcessorStep):
    def __call__(self, transition: EnvTransition) -> EnvTransition:
        # Transform the transition
        transition["observation"]["value"] *= 2.0
        return transition
    
    def transform_features(self, features: dict) -> dict:
        # Describe how features change
        return features
Source: src/lerobot/processor/pipeline.py:143

ProcessorStep Registry

Steps are registered for serialization and sharing:
from lerobot.processor import ProcessorStepRegistry

# List all available steps
print(ProcessorStepRegistry.list())

# Get a step by name
step_class = ProcessorStepRegistry.get("NormalizerProcessorStep")
Source: src/lerobot/processor/pipeline.py:59

DataProcessorPipeline

Chain multiple steps together:
from lerobot.processor import (
    DataProcessorPipeline,
    NormalizerProcessorStep,
    DeviceProcessorStep,
)

pipeline = DataProcessorPipeline(
    steps=[
        NormalizerProcessorStep(stats=dataset.meta.stats),
        DeviceProcessorStep(device="cuda"),
    ],
    name="robot_observation_processor",
)

# Process data
processed = pipeline(raw_observation)
Source: src/lerobot/processor/pipeline.py:253

Built-in Processor Steps

NormalizerProcessorStep

Normalizes observations and actions using dataset statistics:
from lerobot.processor import NormalizerProcessorStep

normalizer = NormalizerProcessorStep(
    stats=dataset.meta.stats,
    mode="mean_std",  # Options: "mean_std", "min_max"
)

# Normalize
transition = normalizer(transition)
# transition["observation"]["state"] is now normalized
The normalizer supports different modes:
  • mean_std: (x - mean) / std
  • min_max: (x - min) / (max - min)
Source: src/lerobot/processor/normalize_processor.py

UnnormalizerProcessorStep

Reverses normalization:
from lerobot.processor import UnnormalizerProcessorStep

unnormalizer = UnnormalizerProcessorStep(
    stats=dataset.meta.stats,
    mode="mean_std",
)

# Unnormalize policy output back to robot units
transition = unnormalizer(transition)
Source: src/lerobot/processor/normalize_processor.py

DeviceProcessorStep

Moves tensors between devices:
from lerobot.processor import DeviceProcessorStep

to_gpu = DeviceProcessorStep(device="cuda")
to_cpu = DeviceProcessorStep(device="cpu")

transition = to_gpu(transition)
# All tensors now on GPU
Source: src/lerobot/processor/device_processor.py

VanillaObservationProcessorStep

Processes raw observations from robots:
from lerobot.processor import VanillaObservationProcessorStep

obs_processor = VanillaObservationProcessorStep()

transition = obs_processor(transition)
Source: src/lerobot/processor/observation_processor.py

RenameObservationsProcessorStep

Renames observation keys:
from lerobot.processor import RenameObservationsProcessorStep

renamer = RenameObservationsProcessorStep(
    mapping={
        "camera_top": "observation.images.top",
        "joint_pos": "observation.state",
    }
)

transition = renamer(transition)
Source: src/lerobot/processor/rename_processor.py

Delta Action Processors

Convert between absolute and relative actions:
from lerobot.processor import MapDeltaActionToRobotActionStep

delta_processor = MapDeltaActionToRobotActionStep()

# Convert delta actions to absolute robot commands
transition = delta_processor(transition)
Source: src/lerobot/processor/delta_action_processor.py

RobotProcessorPipeline

Specialized pipeline for robot control:
from lerobot.processor import RobotProcessorPipeline

robot_processor = RobotProcessorPipeline[
    RobotObservation,  # Input type
    RobotObservation   # Output type
](
    steps=[...],
    to_transition=observation_to_transition,
    to_output=transition_to_observation,
)

processed_obs = robot_processor(raw_obs)
Source: src/lerobot/processor/pipeline.py:70

Factory Functions

Convenient functions to create default processors:
from lerobot.processor import (
    make_default_robot_observation_processor,
    make_default_robot_action_processor,
    make_default_teleop_action_processor,
)

obs_processor = make_default_robot_observation_processor()
action_processor = make_default_robot_action_processor()
teleop_processor = make_default_teleop_action_processor()
Source: src/lerobot/processor/factory.py:27

Example: Complete Pipeline

Here’s a full example of processing observations for a policy:
from lerobot.processor import (
    DataProcessorPipeline,
    RenameObservationsProcessorStep,
    NormalizerProcessorStep,
    DeviceProcessorStep,
)
from lerobot.datasets import LeRobotDataset

# Load dataset for statistics
dataset = LeRobotDataset(repo_id="lerobot/pusht")

# Create processing pipeline
observation_pipeline = DataProcessorPipeline(
    steps=[
        # Step 1: Rename raw robot observations
        RenameObservationsProcessorStep({
            "camera_top": "observation.images.top",
            "joint_pos": "observation.state",
        }),
        # Step 2: Normalize using dataset statistics
        NormalizerProcessorStep(
            stats=dataset.meta.stats,
            mode="mean_std",
        ),
        # Step 3: Move to GPU
        DeviceProcessorStep(device="cuda"),
    ],
    name="observation_processor",
)

# Use in inference
raw_obs = robot.get_observation()
processed_obs = observation_pipeline(raw_obs)
action = policy.select_action(processed_obs)

Example: Action Processing

Process policy outputs back to robot commands:
from lerobot.processor import (
    DataProcessorPipeline,
    DeviceProcessorStep,
    UnnormalizerProcessorStep,
)

action_pipeline = DataProcessorPipeline(
    steps=[
        # Step 1: Move to CPU
        DeviceProcessorStep(device="cpu"),
        # Step 2: Unnormalize to robot units
        UnnormalizerProcessorStep(
            stats=dataset.meta.stats,
            mode="mean_std",
        ),
    ],
    name="action_processor",
)

# Process policy output
policy_action = policy.select_action(observation)
robot_action = action_pipeline(policy_action)
robot.send_action(robot_action)

Stateful Processors

Some processors maintain internal state:
class StatefulProcessor(ProcessorStep):
    def __init__(self):
        self.count = 0
    
    def __call__(self, transition):
        self.count += 1
        return transition
    
    def state_dict(self) -> dict[str, torch.Tensor]:
        return {"count": torch.tensor(self.count)}
    
    def load_state_dict(self, state: dict):
        self.count = state["count"].item()
    
    def reset(self):
        self.count = 0
Source: src/lerobot/processor/pipeline.py:192

Saving and Loading Pipelines

Save to Disk

pipeline.save_pretrained("path/to/processor")
This saves:
  • config.json: Step configurations
  • state.safetensors: Step states (e.g., normalization statistics)

Load from Disk

from lerobot.processor import DataProcessorPipeline

pipeline = DataProcessorPipeline.from_pretrained("path/to/processor")

Push to Hub

pipeline.push_to_hub("username/my_processor")

Load from Hub

pipeline = DataProcessorPipeline.from_pretrained("username/my_processor")

Hooks

Add debugging or logging hooks:
def before_hook(step_idx: int, transition: EnvTransition):
    print(f"Before step {step_idx}")

def after_hook(step_idx: int, transition: EnvTransition):
    print(f"After step {step_idx}: {transition.keys()}")

pipeline = DataProcessorPipeline(
    steps=[...],
    before_step_hooks=[before_hook],
    after_step_hooks=[after_hook],
)
Source: src/lerobot/processor/pipeline.py:281

Integration with Policies

Policies can include processing pipelines:
from lerobot.policies.diffusion import DiffusionPolicy

policy = DiffusionPolicy.from_pretrained("username/my_policy")

# Policy includes its own processor
if hasattr(policy, "processor"):
    processed = policy.processor(observation)
    action = policy.select_action(processed)

Best Practices

Save statistics: Always save normalization statistics with your policy so you can unnormalize outputs correctly during deployment.
Order matters: The order of processing steps is important. Typically: rename → normalize → device transfer.
Batch processing: Processors work on single transitions. Use DataLoader for batch processing during training.

Advanced: Custom Processor Steps

Create custom transformation steps:
from lerobot.processor import ProcessorStep, ProcessorStepRegistry
import torch

@ProcessorStepRegistry.register("ClipActions")
class ClipActionsStep(ProcessorStep):
    def __init__(self, min_val: float = -1.0, max_val: float = 1.0):
        self.min_val = min_val
        self.max_val = max_val
    
    def __call__(self, transition: EnvTransition) -> EnvTransition:
        if "action" in transition:
            transition["action"] = torch.clamp(
                transition["action"],
                self.min_val,
                self.max_val,
            )
        return transition
    
    def transform_features(self, features: dict) -> dict:
        # Features don't change shape/type
        return features
    
    def get_config(self) -> dict:
        return {
            "min_val": self.min_val,
            "max_val": self.max_val,
        }

# Use in pipeline
pipeline = DataProcessorPipeline(
    steps=[
        ClipActionsStep(min_val=-2.0, max_val=2.0),
    ]
)

Next Steps

Build docs developers (and LLMs) love