Environment processors transform observations from simulation environments (like LIBERO, Isaac Lab Arena) into the LeRobot format. They handle environment-specific conventions like camera orientations, state representations, and data formats.
Overview
Environment processors bridge the gap between:
- Simulation environments: Each with their own observation format
- LeRobot format: Standardized observation structure expected by policies
They typically handle:
- Image preprocessing (rotation, format conversion)
- State extraction and concatenation
- Feature renaming and reorganization
LIBERO Environment
Process LIBERO environment observations:
from lerobot.processor import ProcessorStepRegistry
# Create LIBERO processor
libero_processor = ProcessorStepRegistry.get("libero_processor")()
# Process observation
obs = {
"observation.images.camera": camera_image, # (B, H, W, C)
"observation.robot_state": {
"eef": {
"pos": eef_position, # (B, 3)
"quat": eef_quat, # (B, 4) - quaternion
},
"gripper": {
"qpos": gripper_pos, # (B, 2)
},
},
}
transition = create_transition(observation=obs)
processed = libero_processor(transition)
print(processed["observation"])
# Output:
# {
# "observation.images.camera": <rotated_image>, # Rotated 180°
# "observation.state": <flat_state>, # (B, 8) = [pos(3), axisangle(3), gripper(2)]
# }
See lerobot/processor/env_processor.py:26 for the implementation.
What LIBERO Processor Does
- Rotates images 180 degrees to match HuggingFaceVLA camera convention:
img = torch.flip(img, dims=[2, 3]) # Flip H and W
- Flattens robot state from nested dict to single vector:
# Extract components
eef_pos = robot_state["eef"]["pos"] # (B, 3)
eef_quat = robot_state["eef"]["quat"] # (B, 4)
gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2)
# Convert quaternion to axis-angle
eef_axisangle = quat2axisangle(eef_quat) # (B, 3)
# Concatenate
state = torch.cat([eef_pos, eef_axisangle, gripper_qpos], dim=-1) # (B, 8)
- Updates feature shapes in
transform_features():
def transform_features(self, features):
# Replace nested robot_state with flat state
features[FeatureType.STATE] = {
"observation.state": PolicyFeature(
type=FeatureType.STATE,
shape=(8,) # [eef_pos(3), axis_angle(3), gripper(2)]
)
}
return features
Isaac Lab Arena Environment
Process Isaac Lab Arena observations:
from lerobot.processor import ProcessorStepRegistry
# Create Isaac Lab processor
isaaclab_processor = ProcessorStepRegistry.get("isaaclab_arena_processor")(
state_keys=("robot_joint_pos", "left_eef_pos", "right_eef_pos"),
camera_keys=("robot_pov_cam_rgb", "third_person_cam_rgb")
)
# Process observation
obs = {
"observation.policy": {
"robot_joint_pos": joint_positions, # (B, 7)
"left_eef_pos": left_eef, # (B, 3)
"right_eef_pos": right_eef, # (B, 3)
"object_pos": object_positions, # (B, N, 3) - not used
},
"observation.camera_obs": {
"robot_pov_cam_rgb": robot_cam, # (B, H, W, C) uint8
"third_person_cam_rgb": third_cam, # (B, H, W, C) uint8
"depth": depth_map, # (B, H, W) - not used
},
}
transition = create_transition(observation=obs)
processed = isaaclab_processor(transition)
print(processed["observation"])
# Output:
# {
# "observation.images.robot_pov_cam_rgb": <image>, # (B, C, H, W) float32 [0,1]
# "observation.images.third_person_cam_rgb": <image>, # (B, C, H, W) float32 [0,1]
# "observation.state": <flat_state>, # (B, 13) = concat of state_keys
# }
See lerobot/processor/env_processor.py:156 for the implementation.
What Isaac Lab Processor Does
- Converts images from (B, H, W, C) uint8 to (B, C, H, W) float32:
img = img.permute(0, 3, 1, 2).contiguous() # BHWC -> BCHW
if img.dtype == torch.uint8:
img = img.float() / 255.0 # [0, 255] -> [0.0, 1.0]
- Selectively extracts and concatenates state based on
state_keys:
state_components = []
for key in self.state_keys: # e.g., ["robot_joint_pos", "left_eef_pos", "right_eef_pos"]
if key in policy_obs:
component = policy_obs[key]
# Flatten extra dims: (B, N, M) -> (B, N*M)
if component.dim() > 2:
batch_size = component.shape[0]
component = component.view(batch_size, -1)
state_components.append(component)
state = torch.cat(state_components, dim=-1) # (B, total_state_dim)
- Renames features to LeRobot convention:
observation.camera_obs.<name> → observation.images.<name>
observation.policy → observation.state
Configuration
Keys to extract from obs["policy"] and concatenate into state vector. Order matters!
Camera names to extract from obs["camera_obs"]
Custom Environment Processor
Create a processor for your custom environment:
from lerobot.processor import ObservationProcessorStep, ProcessorStepRegistry
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
import torch
@ProcessorStepRegistry.register("my_env_processor")
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
"""Process observations from MyCustomEnv.
MyCustomEnv provides:
- obs["rgb"]: (H, W, 3) numpy array in range [0, 255]
- obs["depth"]: (H, W) numpy array
- obs["proprio"]: dict with keys "joint_pos", "joint_vel", "force"
We convert to LeRobot format:
- observation.images.rgb: (B, 3, H, W) float32 [0, 1]
- observation.state: (B, state_dim) concatenation of proprio
"""
include_velocity: bool = True
include_force: bool = False
def observation(self, observation: dict) -> dict:
processed = {}
# Process RGB image
if "rgb" in observation:
rgb = observation["rgb"]
# Convert numpy to tensor if needed
if isinstance(rgb, np.ndarray):
rgb = torch.from_numpy(rgb)
# Add batch dim if needed
if rgb.ndim == 3:
rgb = rgb.unsqueeze(0) # (H, W, C) -> (B, H, W, C)
# Convert to (B, C, H, W) and normalize
rgb = rgb.permute(0, 3, 1, 2).float() / 255.0
processed[f"{OBS_IMAGES}.rgb"] = rgb
# Process proprioception
if "proprio" in observation:
proprio = observation["proprio"]
state_components = []
# Always include joint position
if "joint_pos" in proprio:
state_components.append(torch.tensor(proprio["joint_pos"]).float())
# Optionally include velocity
if self.include_velocity and "joint_vel" in proprio:
state_components.append(torch.tensor(proprio["joint_vel"]).float())
# Optionally include force
if self.include_force and "force" in proprio:
state_components.append(torch.tensor(proprio["force"]).float())
# Concatenate and add batch dim if needed
state = torch.cat(state_components, dim=-1)
if state.ndim == 1:
state = state.unsqueeze(0) # (N,) -> (B, N)
processed[OBS_STATE] = state
return processed
def transform_features(self, features):
"""Update feature descriptions."""
new_features = {}
# Add image feature
new_features[FeatureType.IMAGE] = {
f"{OBS_IMAGES}.rgb": PolicyFeature(
type=FeatureType.IMAGE,
shape=(3, 224, 224) # Assuming 224x224 images
)
}
# Compute state dimension
state_dim = 7 # joint_pos
if self.include_velocity:
state_dim += 7 # joint_vel
if self.include_force:
state_dim += 6 # force
new_features[FeatureType.STATE] = {
OBS_STATE: PolicyFeature(
type=FeatureType.STATE,
shape=(state_dim,)
)
}
return new_features
# Usage
processor = MyEnvProcessorStep(
include_velocity=True,
include_force=False
)
Handling Multi-Dimensional States
Some environments provide multi-dimensional state arrays that need flattening:
@ProcessorStepRegistry.register("flatten_state_processor")
@dataclass
class FlattenStateProcessorStep(ObservationProcessorStep):
"""Flatten multi-dimensional state observations."""
state_keys: tuple[str, ...]
def observation(self, observation: dict) -> dict:
processed = observation.copy()
state_components = []
for key in self.state_keys:
if key in observation:
component = observation[key]
# Flatten all dims except batch
if component.dim() > 2:
batch_size = component.shape[0]
component = component.view(batch_size, -1)
elif component.dim() == 1:
component = component.unsqueeze(0)
state_components.append(component)
if state_components:
processed[OBS_STATE] = torch.cat(state_components, dim=-1)
return processed
def transform_features(self, features):
# Calculate total flattened dimension
total_dim = 0
for key in self.state_keys:
if key in features.get(FeatureType.STATE, {}):
feature = features[FeatureType.STATE][key]
total_dim += np.prod(feature.shape)
return {
FeatureType.STATE: {
OBS_STATE: PolicyFeature(
type=FeatureType.STATE,
shape=(total_dim,)
)
}
}
Testing Environment Processors
Always test with realistic environment data:
import pytest
import torch
import numpy as np
from lerobot.processor import create_transition
def test_my_env_processor():
# Create realistic test data
observation = {
"rgb": np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8),
"proprio": {
"joint_pos": np.random.randn(7).astype(np.float32),
"joint_vel": np.random.randn(7).astype(np.float32),
"force": np.random.randn(6).astype(np.float32),
},
}
# Create processor
processor = MyEnvProcessorStep(
include_velocity=True,
include_force=True
)
# Process
transition = create_transition(observation=observation)
processed = processor(transition)
# Verify outputs
assert "observation.images.rgb" in processed["observation"]
assert "observation.state" in processed["observation"]
# Check shapes
rgb = processed["observation"]["observation.images.rgb"]
assert rgb.shape == (1, 3, 224, 224) # (B, C, H, W)
assert rgb.dtype == torch.float32
assert rgb.min() >= 0.0 and rgb.max() <= 1.0
state = processed["observation"]["observation.state"]
assert state.shape == (1, 20) # 7 + 7 + 6
assert state.dtype == torch.float32
# Verify feature transformation
input_features = {}
output_features = processor.transform_features(input_features)
assert FeatureType.IMAGE in output_features
assert FeatureType.STATE in output_features
assert output_features[FeatureType.STATE]["observation.state"].shape == (20,)
Integration with Environments
Use environment processors in your gym environment:
import gymnasium as gym
from lerobot.processor import DataProcessorPipeline
class MyGymEnv(gym.Env):
def __init__(self):
# Create observation processor
self.obs_processor = DataProcessorPipeline(
steps=[
MyEnvProcessorStep(include_velocity=True),
# Add more processors if needed
],
name="obs_pipeline"
)
def reset(self):
# Get raw observation from environment
raw_obs = self._get_raw_observation()
# Process to LeRobot format
transition = create_transition(observation=raw_obs)
processed = self.obs_processor(transition)
return processed["observation"]
def step(self, action):
# Execute action
raw_obs = self._execute_action(action)
# Process observation
transition = create_transition(observation=raw_obs)
processed = self.obs_processor(transition)
return processed["observation"], reward, done, truncated, info
Best Practices
1. Match Environment Conventions
Understand your environment’s data format:
# Document environment output format
"""
MyEnv observation format:
- Images: (H, W, C) uint8 [0, 255], BGR color space
- State: dict with keys ["pos", "vel", "acc"]
- Coordinate system: X-forward, Y-left, Z-up
"""
# Handle in processor
def observation(self, obs):
# Convert BGR to RGB
img = obs["rgb"][:, :, ::-1] # BGR -> RGB
# Transform coordinates if needed
# ...
2. Preserve Batch Dimension
Always maintain batch dimension:
def observation(self, obs):
# Add batch dim if missing
if tensor.ndim == expected_ndim - 1:
tensor = tensor.unsqueeze(0)
# Process with batch dim
# ...
return processed
3. Document State Ordering
Clearly document concatenation order:
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
"""
State vector composition (total: 20):
- [0:7]: joint positions
- [7:14]: joint velocities
- [14:17]: end-effector position
- [17:20]: end-effector orientation (axis-angle)
"""
API Reference
LiberoProcessorStep
See lerobot/processor/env_processor.py:26
Process LIBERO observation to LeRobot format
IsaaclabArenaProcessorStep
See lerobot/processor/env_processor.py:156
Keys to extract from policy observation
Camera names to extract from camera observation
Process Isaac Lab observation to LeRobot format