Skip to main content
Environment processors transform observations from simulation environments (like LIBERO, Isaac Lab Arena) into the LeRobot format. They handle environment-specific conventions like camera orientations, state representations, and data formats.

Overview

Environment processors bridge the gap between:
  • Simulation environments: Each with their own observation format
  • LeRobot format: Standardized observation structure expected by policies
They typically handle:
  • Image preprocessing (rotation, format conversion)
  • State extraction and concatenation
  • Feature renaming and reorganization

LIBERO Environment

Process LIBERO environment observations:
from lerobot.processor import ProcessorStepRegistry

# Create LIBERO processor
libero_processor = ProcessorStepRegistry.get("libero_processor")()

# Process observation
obs = {
    "observation.images.camera": camera_image,  # (B, H, W, C)
    "observation.robot_state": {
        "eef": {
            "pos": eef_position,    # (B, 3)
            "quat": eef_quat,       # (B, 4) - quaternion
        },
        "gripper": {
            "qpos": gripper_pos,    # (B, 2)
        },
    },
}

transition = create_transition(observation=obs)
processed = libero_processor(transition)

print(processed["observation"])
# Output:
# {
#   "observation.images.camera": <rotated_image>,  # Rotated 180°
#   "observation.state": <flat_state>,             # (B, 8) = [pos(3), axisangle(3), gripper(2)]
# }
See lerobot/processor/env_processor.py:26 for the implementation.

What LIBERO Processor Does

  1. Rotates images 180 degrees to match HuggingFaceVLA camera convention:
img = torch.flip(img, dims=[2, 3])  # Flip H and W
  1. Flattens robot state from nested dict to single vector:
# Extract components
eef_pos = robot_state["eef"]["pos"]        # (B, 3)
eef_quat = robot_state["eef"]["quat"]      # (B, 4)
gripper_qpos = robot_state["gripper"]["qpos"]  # (B, 2)

# Convert quaternion to axis-angle
eef_axisangle = quat2axisangle(eef_quat)   # (B, 3)

# Concatenate
state = torch.cat([eef_pos, eef_axisangle, gripper_qpos], dim=-1)  # (B, 8)
  1. Updates feature shapes in transform_features():
def transform_features(self, features):
    # Replace nested robot_state with flat state
    features[FeatureType.STATE] = {
        "observation.state": PolicyFeature(
            type=FeatureType.STATE,
            shape=(8,)  # [eef_pos(3), axis_angle(3), gripper(2)]
        )
    }
    return features

Isaac Lab Arena Environment

Process Isaac Lab Arena observations:
from lerobot.processor import ProcessorStepRegistry

# Create Isaac Lab processor
isaaclab_processor = ProcessorStepRegistry.get("isaaclab_arena_processor")(
    state_keys=("robot_joint_pos", "left_eef_pos", "right_eef_pos"),
    camera_keys=("robot_pov_cam_rgb", "third_person_cam_rgb")
)

# Process observation
obs = {
    "observation.policy": {
        "robot_joint_pos": joint_positions,    # (B, 7)
        "left_eef_pos": left_eef,              # (B, 3)
        "right_eef_pos": right_eef,            # (B, 3)
        "object_pos": object_positions,        # (B, N, 3) - not used
    },
    "observation.camera_obs": {
        "robot_pov_cam_rgb": robot_cam,        # (B, H, W, C) uint8
        "third_person_cam_rgb": third_cam,     # (B, H, W, C) uint8
        "depth": depth_map,                    # (B, H, W) - not used
    },
}

transition = create_transition(observation=obs)
processed = isaaclab_processor(transition)

print(processed["observation"])
# Output:
# {
#   "observation.images.robot_pov_cam_rgb": <image>,      # (B, C, H, W) float32 [0,1]
#   "observation.images.third_person_cam_rgb": <image>,   # (B, C, H, W) float32 [0,1]
#   "observation.state": <flat_state>,                     # (B, 13) = concat of state_keys
# }
See lerobot/processor/env_processor.py:156 for the implementation.

What Isaac Lab Processor Does

  1. Converts images from (B, H, W, C) uint8 to (B, C, H, W) float32:
img = img.permute(0, 3, 1, 2).contiguous()  # BHWC -> BCHW
if img.dtype == torch.uint8:
    img = img.float() / 255.0  # [0, 255] -> [0.0, 1.0]
  1. Selectively extracts and concatenates state based on state_keys:
state_components = []
for key in self.state_keys:  # e.g., ["robot_joint_pos", "left_eef_pos", "right_eef_pos"]
    if key in policy_obs:
        component = policy_obs[key]
        # Flatten extra dims: (B, N, M) -> (B, N*M)
        if component.dim() > 2:
            batch_size = component.shape[0]
            component = component.view(batch_size, -1)
        state_components.append(component)

state = torch.cat(state_components, dim=-1)  # (B, total_state_dim)
  1. Renames features to LeRobot convention:
  • observation.camera_obs.<name>observation.images.<name>
  • observation.policyobservation.state

Configuration

state_keys
tuple[str, ...]
required
Keys to extract from obs["policy"] and concatenate into state vector. Order matters!
camera_keys
tuple[str, ...]
required
Camera names to extract from obs["camera_obs"]

Custom Environment Processor

Create a processor for your custom environment:
from lerobot.processor import ObservationProcessorStep, ProcessorStepRegistry
from lerobot.utils.constants import OBS_IMAGES, OBS_STATE
import torch

@ProcessorStepRegistry.register("my_env_processor")
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
    """Process observations from MyCustomEnv.
    
    MyCustomEnv provides:
    - obs["rgb"]: (H, W, 3) numpy array in range [0, 255]
    - obs["depth"]: (H, W) numpy array  
    - obs["proprio"]: dict with keys "joint_pos", "joint_vel", "force"
    
    We convert to LeRobot format:
    - observation.images.rgb: (B, 3, H, W) float32 [0, 1]
    - observation.state: (B, state_dim) concatenation of proprio
    """
    
    include_velocity: bool = True
    include_force: bool = False
    
    def observation(self, observation: dict) -> dict:
        processed = {}
        
        # Process RGB image
        if "rgb" in observation:
            rgb = observation["rgb"]
            
            # Convert numpy to tensor if needed
            if isinstance(rgb, np.ndarray):
                rgb = torch.from_numpy(rgb)
            
            # Add batch dim if needed
            if rgb.ndim == 3:
                rgb = rgb.unsqueeze(0)  # (H, W, C) -> (B, H, W, C)
            
            # Convert to (B, C, H, W) and normalize
            rgb = rgb.permute(0, 3, 1, 2).float() / 255.0
            
            processed[f"{OBS_IMAGES}.rgb"] = rgb
        
        # Process proprioception
        if "proprio" in observation:
            proprio = observation["proprio"]
            
            state_components = []
            
            # Always include joint position
            if "joint_pos" in proprio:
                state_components.append(torch.tensor(proprio["joint_pos"]).float())
            
            # Optionally include velocity
            if self.include_velocity and "joint_vel" in proprio:
                state_components.append(torch.tensor(proprio["joint_vel"]).float())
            
            # Optionally include force
            if self.include_force and "force" in proprio:
                state_components.append(torch.tensor(proprio["force"]).float())
            
            # Concatenate and add batch dim if needed
            state = torch.cat(state_components, dim=-1)
            if state.ndim == 1:
                state = state.unsqueeze(0)  # (N,) -> (B, N)
            
            processed[OBS_STATE] = state
        
        return processed
    
    def transform_features(self, features):
        """Update feature descriptions."""
        new_features = {}
        
        # Add image feature
        new_features[FeatureType.IMAGE] = {
            f"{OBS_IMAGES}.rgb": PolicyFeature(
                type=FeatureType.IMAGE,
                shape=(3, 224, 224)  # Assuming 224x224 images
            )
        }
        
        # Compute state dimension
        state_dim = 7  # joint_pos
        if self.include_velocity:
            state_dim += 7  # joint_vel
        if self.include_force:
            state_dim += 6  # force
        
        new_features[FeatureType.STATE] = {
            OBS_STATE: PolicyFeature(
                type=FeatureType.STATE,
                shape=(state_dim,)
            )
        }
        
        return new_features

# Usage
processor = MyEnvProcessorStep(
    include_velocity=True,
    include_force=False
)

Handling Multi-Dimensional States

Some environments provide multi-dimensional state arrays that need flattening:
@ProcessorStepRegistry.register("flatten_state_processor")
@dataclass
class FlattenStateProcessorStep(ObservationProcessorStep):
    """Flatten multi-dimensional state observations."""
    
    state_keys: tuple[str, ...]
    
    def observation(self, observation: dict) -> dict:
        processed = observation.copy()
        
        state_components = []
        for key in self.state_keys:
            if key in observation:
                component = observation[key]
                
                # Flatten all dims except batch
                if component.dim() > 2:
                    batch_size = component.shape[0]
                    component = component.view(batch_size, -1)
                elif component.dim() == 1:
                    component = component.unsqueeze(0)
                
                state_components.append(component)
        
        if state_components:
            processed[OBS_STATE] = torch.cat(state_components, dim=-1)
        
        return processed
    
    def transform_features(self, features):
        # Calculate total flattened dimension
        total_dim = 0
        for key in self.state_keys:
            if key in features.get(FeatureType.STATE, {}):
                feature = features[FeatureType.STATE][key]
                total_dim += np.prod(feature.shape)
        
        return {
            FeatureType.STATE: {
                OBS_STATE: PolicyFeature(
                    type=FeatureType.STATE,
                    shape=(total_dim,)
                )
            }
        }

Testing Environment Processors

Always test with realistic environment data:
import pytest
import torch
import numpy as np
from lerobot.processor import create_transition

def test_my_env_processor():
    # Create realistic test data
    observation = {
        "rgb": np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8),
        "proprio": {
            "joint_pos": np.random.randn(7).astype(np.float32),
            "joint_vel": np.random.randn(7).astype(np.float32),
            "force": np.random.randn(6).astype(np.float32),
        },
    }
    
    # Create processor
    processor = MyEnvProcessorStep(
        include_velocity=True,
        include_force=True
    )
    
    # Process
    transition = create_transition(observation=observation)
    processed = processor(transition)
    
    # Verify outputs
    assert "observation.images.rgb" in processed["observation"]
    assert "observation.state" in processed["observation"]
    
    # Check shapes
    rgb = processed["observation"]["observation.images.rgb"]
    assert rgb.shape == (1, 3, 224, 224)  # (B, C, H, W)
    assert rgb.dtype == torch.float32
    assert rgb.min() >= 0.0 and rgb.max() <= 1.0
    
    state = processed["observation"]["observation.state"]
    assert state.shape == (1, 20)  # 7 + 7 + 6
    assert state.dtype == torch.float32
    
    # Verify feature transformation
    input_features = {}
    output_features = processor.transform_features(input_features)
    
    assert FeatureType.IMAGE in output_features
    assert FeatureType.STATE in output_features
    assert output_features[FeatureType.STATE]["observation.state"].shape == (20,)

Integration with Environments

Use environment processors in your gym environment:
import gymnasium as gym
from lerobot.processor import DataProcessorPipeline

class MyGymEnv(gym.Env):
    def __init__(self):
        # Create observation processor
        self.obs_processor = DataProcessorPipeline(
            steps=[
                MyEnvProcessorStep(include_velocity=True),
                # Add more processors if needed
            ],
            name="obs_pipeline"
        )
    
    def reset(self):
        # Get raw observation from environment
        raw_obs = self._get_raw_observation()
        
        # Process to LeRobot format
        transition = create_transition(observation=raw_obs)
        processed = self.obs_processor(transition)
        
        return processed["observation"]
    
    def step(self, action):
        # Execute action
        raw_obs = self._execute_action(action)
        
        # Process observation
        transition = create_transition(observation=raw_obs)
        processed = self.obs_processor(transition)
        
        return processed["observation"], reward, done, truncated, info

Best Practices

1. Match Environment Conventions

Understand your environment’s data format:
# Document environment output format
"""
MyEnv observation format:
- Images: (H, W, C) uint8 [0, 255], BGR color space
- State: dict with keys ["pos", "vel", "acc"]
- Coordinate system: X-forward, Y-left, Z-up
"""

# Handle in processor
def observation(self, obs):
    # Convert BGR to RGB
    img = obs["rgb"][:, :, ::-1]  # BGR -> RGB
    
    # Transform coordinates if needed
    # ...

2. Preserve Batch Dimension

Always maintain batch dimension:
def observation(self, obs):
    # Add batch dim if missing
    if tensor.ndim == expected_ndim - 1:
        tensor = tensor.unsqueeze(0)
    
    # Process with batch dim
    # ...
    
    return processed

3. Document State Ordering

Clearly document concatenation order:
@dataclass
class MyEnvProcessorStep(ObservationProcessorStep):
    """
    State vector composition (total: 20):
    - [0:7]:   joint positions
    - [7:14]:  joint velocities  
    - [14:17]: end-effector position
    - [17:20]: end-effector orientation (axis-angle)
    """

API Reference

LiberoProcessorStep

See lerobot/processor/env_processor.py:26
observation
(observation) -> dict
Process LIBERO observation to LeRobot format

IsaaclabArenaProcessorStep

See lerobot/processor/env_processor.py:156
state_keys
tuple[str, ...]
Keys to extract from policy observation
camera_keys
tuple[str, ...]
Camera names to extract from camera observation
observation
(observation) -> dict
Process Isaac Lab observation to LeRobot format

Build docs developers (and LLMs) love