LeRobot includes specialized processors for robot kinematics, end-effector control, and coordinate transformations. These processors are used to convert between joint space and Cartesian space, apply safety bounds, and handle robot-specific conventions.
Overview
Robot processors handle transformations specific to physical robots:
- Kinematic transformations: Forward/inverse kinematics between joints and end-effector
- Safety bounds: Workspace limits and velocity constraints
- Delta actions: Relative vs absolute position commands
- Gripper control: Velocity to position conversion
These processors are typically used in the robot’s action pipeline, converting policy outputs to robot commands.
Forward Kinematics
Convert joint positions to end-effector pose:
from lerobot.processor import ProcessorStepRegistry
from lerobot.model.kinematics import RobotKinematics
# Load robot kinematics model
kinematics = RobotKinematics.from_urdf(
urdf_path="robots/so100/so100.urdf",
end_effector_link="gripper_link"
)
# Create forward kinematics processor
fk_processor = ProcessorStepRegistry.get("forward_kinematics_joints_to_ee")(
kinematics=kinematics,
motor_names=["shoulder", "elbow", "wrist", "gripper"]
)
# Apply to observation
transition = create_transition(
observation={
"shoulder.pos": 0.5,
"elbow.pos": -0.3,
"wrist.pos": 0.1,
"gripper.pos": 50.0,
}
)
processed = fk_processor(transition)
print(processed["observation"])
# Output:
# {
# "ee.x": 0.324,
# "ee.y": 0.156,
# "ee.z": 0.512,
# "ee.wx": 0.0,
# "ee.wy": 0.1,
# "ee.wz": 0.0,
# "ee.gripper_pos": 50.0,
# }
See lerobot/robots/so_follower/robot_kinematic_processor.py:426 for the implementation.
Inverse Kinematics
Convert end-effector pose to joint positions:
# Create inverse kinematics processor
ik_processor = ProcessorStepRegistry.get("inverse_kinematics_ee_to_joints")(
kinematics=kinematics,
motor_names=["shoulder", "elbow", "wrist", "gripper"],
initial_guess_current_joints=True
)
# Apply to action
transition = create_transition(
observation=current_joints, # Used as IK initial guess
action={
"ee.x": 0.3,
"ee.y": 0.2,
"ee.z": 0.5,
"ee.wx": 0.0,
"ee.wy": 0.0,
"ee.wz": 0.0,
"ee.gripper_pos": 80.0,
}
)
processed = ik_processor(transition)
print(processed["action"])
# Output:
# {
# "shoulder.pos": 0.52,
# "elbow.pos": -0.28,
# "wrist.pos": 0.09,
# "gripper.pos": 80.0,
# }
See lerobot/robots/so_follower/robot_kinematic_processor.py:252 for the implementation.
IK Configuration
Robot kinematics model loaded from URDF
List of motor names in order (excluding gripper)
initial_guess_current_joints
Use current joint positions as IK initial guess. If False, uses previous IK solution.
Delta Actions
Convert relative (delta) commands to absolute end-effector poses:
from lerobot.processor import ProcessorStepRegistry
# Create delta action processor
delta_processor = ProcessorStepRegistry.get("ee_reference_and_delta")(
kinematics=kinematics,
motor_names=["shoulder", "elbow", "wrist"],
end_effector_step_sizes={"x": 0.01, "y": 0.01, "z": 0.01}, # meters per command unit
use_latched_reference=True
)
# Process delta action
transition = create_transition(
observation=current_joints,
action={
"enabled": True,
"target_x": 1.0, # Move +1 step in x (= 0.01m)
"target_y": 0.0, # No y movement
"target_z": -0.5, # Move -0.5 steps in z (= -0.005m)
"target_wx": 0.0,
"target_wy": 0.0,
"target_wz": 0.1, # Small rotation around z
"gripper_vel": 0.5,
}
)
processed = delta_processor(transition)
print(processed["action"])
# Output:
# {
# "ee.x": 0.33, # reference_x + 0.01
# "ee.y": 0.15, # reference_y + 0.0
# "ee.z": 0.505, # reference_z - 0.005
# "ee.wx": 0.0,
# "ee.wy": 0.1,
# "ee.wz": 0.1, # reference_wz + 0.1
# "ee.gripper_vel": 0.5,
# }
See lerobot/robots/so_follower/robot_kinematic_processor.py:37 for the implementation.
Delta Action Configuration
Scaling factors for delta commands (meters per unit). Keys: “x”, “y”, “z”
If True, latch reference pose when action is first enabled. If False, always use current pose as reference.
Use IK solution from complementary data instead of measured joints
Safety Bounds
Apply workspace limits and velocity constraints:
from lerobot.processor import ProcessorStepRegistry
# Create bounds processor
bounds_processor = ProcessorStepRegistry.get("ee_bounds_and_safety")(
end_effector_bounds={
"min": np.array([0.1, -0.3, 0.0]), # Min [x, y, z]
"max": np.array([0.5, 0.3, 0.6]), # Max [x, y, z]
},
max_ee_step_m=0.05 # Max 5cm movement per step
)
# Process action
transition = create_transition(
action={
"ee.x": 0.7, # Outside bounds!
"ee.y": 0.2,
"ee.z": 0.4,
"ee.wx": 0.0,
"ee.wy": 0.0,
"ee.wz": 0.0,
}
)
processed = bounds_processor(transition)
print(processed["action"])
# Output:
# {
# "ee.x": 0.5, # Clipped to max
# "ee.y": 0.2,
# "ee.z": 0.4,
# "ee.wx": 0.0,
# "ee.wy": 0.0,
# "ee.wz": 0.0,
# }
See lerobot/robots/so_follower/robot_kinematic_processor.py:185 for the implementation.
Safety Configuration
Dictionary with “min” and “max” keys containing [x, y, z] bounds
Maximum allowed position change per step (meters). Larger movements are scaled down.
Gripper Control
Convert gripper velocity to position:
from lerobot.processor import ProcessorStepRegistry
# Create gripper processor
gripper_processor = ProcessorStepRegistry.get("gripper_velocity_to_joint")(
speed_factor=20.0, # Velocity to position conversion
clip_min=0.0, # Fully open
clip_max=100.0, # Fully closed
discrete_gripper=False
)
# Process action
transition = create_transition(
observation={"gripper.pos": 50.0}, # Current gripper position
action={"ee.gripper_vel": 0.5} # Positive = close
)
processed = gripper_processor(transition)
print(processed["action"])
# Output:
# {
# "ee.gripper_pos": 60.0 # 50.0 + (0.5 * 20.0)
# }
See lerobot/robots/so_follower/robot_kinematic_processor.py:341 for the implementation.
Discrete Gripper Mode
Handle discrete gripper actions (open/close/stay):
gripper_processor = ProcessorStepRegistry.get("gripper_velocity_to_joint")(
speed_factor=20.0,
clip_min=0.0,
clip_max=100.0,
discrete_gripper=True # Enable discrete mode
)
# Action: 0=open, 1=close, 2=stay
transition = create_transition(
observation={"gripper.pos": 50.0},
action={"ee.gripper_vel": 1} # Close command
)
processed = gripper_processor(transition)
print(processed["action"]["ee.gripper_pos"]) # 100.0 (fully closed)
Complete Pipeline Example
Combine robot processors for end-effector control:
from lerobot.processor import DataProcessorPipeline, ProcessorStepRegistry
from lerobot.model.kinematics import RobotKinematics
# Load kinematics
kinematics = RobotKinematics.from_urdf(
urdf_path="robots/so100/so100.urdf",
end_effector_link="gripper_link"
)
motor_names = ["shoulder_pan", "shoulder_lift", "elbow", "wrist"]
# Build pipeline
pipeline = DataProcessorPipeline(
steps=[
# 1. Convert delta commands to absolute EE pose
ProcessorStepRegistry.get("ee_reference_and_delta")(
kinematics=kinematics,
motor_names=motor_names,
end_effector_step_sizes={"x": 0.01, "y": 0.01, "z": 0.01},
use_latched_reference=True
),
# 2. Apply safety bounds
ProcessorStepRegistry.get("ee_bounds_and_safety")(
end_effector_bounds={
"min": np.array([0.1, -0.3, 0.0]),
"max": np.array([0.5, 0.3, 0.6]),
},
max_ee_step_m=0.05
),
# 3. Convert gripper velocity to position
ProcessorStepRegistry.get("gripper_velocity_to_joint")(
speed_factor=20.0,
clip_min=0.0,
clip_max=100.0
),
# 4. Compute inverse kinematics to get joint positions
ProcessorStepRegistry.get("inverse_kinematics_ee_to_joints")(
kinematics=kinematics,
motor_names=motor_names,
initial_guess_current_joints=True
),
],
name="robot_action_pipeline"
)
# Use pipeline
transition = create_transition(
observation=current_joint_state,
action=delta_action_from_policy
)
robot_action = pipeline(transition)["action"]
# robot_action contains safe joint positions ready to send to robot
Custom Robot Processor
Create a processor for your robot:
from lerobot.processor import RobotActionProcessorStep, ProcessorStepRegistry
from lerobot.processor.core import TransitionKey
import numpy as np
@ProcessorStepRegistry.register("my_robot_action")
@dataclass
class MyRobotActionStep(RobotActionProcessorStep):
"""Custom processor for my robot."""
joint_limits: dict[str, tuple[float, float]] # {joint_name: (min, max)}
def action(self, action: dict[str, float]) -> dict[str, float]:
"""Clip actions to joint limits."""
processed = {}
for joint_name, value in action.items():
if joint_name in self.joint_limits:
min_val, max_val = self.joint_limits[joint_name]
processed[joint_name] = np.clip(value, min_val, max_val)
else:
processed[joint_name] = value
return processed
def transform_features(self, features):
return features # No change to feature shapes
# Usage
processor = MyRobotActionStep(
joint_limits={
"shoulder.pos": (-3.14, 3.14),
"elbow.pos": (-1.57, 1.57),
}
)
Best Practices
1. Reset State Between Episodes
Many robot processors maintain state that should be reset:
for episode in episodes:
# Reset all processors
for step in pipeline.steps:
if hasattr(step, 'reset'):
step.reset()
# Run episode
for transition in episode:
processed = pipeline(transition)
2. Handle Singularities
Inverse kinematics can fail near singularities:
try:
processed = ik_processor(transition)
except RuntimeError as e:
if "IK failed" in str(e):
# Fallback: keep previous joint configuration
processed["action"] = previous_joint_state
else:
raise
3. Validate Kinematics Model
Verify forward/inverse kinematics consistency:
# Forward then inverse should recover original joints
original_joints = np.array([0.5, -0.3, 0.1, 0.0])
ee_pose = kinematics.forward_kinematics(original_joints)
recovered_joints = kinematics.inverse_kinematics(original_joints, ee_pose)
assert np.allclose(original_joints, recovered_joints, atol=1e-3)
API Reference
ForwardKinematicsJointsToEE
See lerobot/robots/so_follower/robot_kinematic_processor.py:492
Joint names (excluding gripper)
InverseKinematicsEEToJoints
See lerobot/robots/so_follower/robot_kinematic_processor.py:252
Joint names (excluding gripper)
initial_guess_current_joints
Use current joints as IK initial guess
EEReferenceAndDelta
See lerobot/robots/so_follower/robot_kinematic_processor.py:37
Scaling for delta commands {"x": float, "y": float, "z": float}
Latch reference on enable vs always use current
EEBoundsAndSafety
See lerobot/robots/so_follower/robot_kinematic_processor.py:185
Workspace bounds {"min": array, "max": array}
Maximum position change per step (meters)