Skip to main content
LeRobot provides a unified Robot interface that makes it easy to integrate custom hardware. Once integrated, you can use all of LeRobot’s tools for data collection, training, and deployment.

Robot Interface

All LeRobot robots inherit from the Robot base class and implement a standard interface:
from lerobot.robots.robot import Robot
from lerobot.robots.config import RobotConfig
from lerobot.processor import RobotObservation, RobotAction
from dataclasses import dataclass

@dataclass
class MyRobotConfig(RobotConfig):
    """Configuration for MyRobot."""
    port: str = "/dev/ttyUSB0"
    baud_rate: int = 115200
    # Add your robot-specific parameters

class MyRobot(Robot):
    """Custom robot implementation."""
    
    # Required class attributes
    name = "my_robot"
    config_class = MyRobotConfig
    
    def __init__(self, config: MyRobotConfig):
        super().__init__(config)
        self.port = config.port
        self.baud_rate = config.baud_rate
        self._is_connected = False
    
    @property
    def observation_features(self) -> dict:
        """Define observation space structure."""
        return {
            "state": (7,),  # 7D joint positions
            "pixels": {
                "side": (480, 640, 3),  # Camera dimensions
                "wrist": (240, 320, 3),
            }
        }
    
    @property
    def action_features(self) -> dict:
        """Define action space structure."""
        return {
            "action": (7,),  # 7D joint velocities
        }
    
    @property
    def is_connected(self) -> bool:
        """Check if robot is connected."""
        return self._is_connected
    
    def connect(self) -> None:
        """Connect to the robot."""
        # Initialize hardware connection
        print(f"Connecting to robot on {self.port}...")
        # Your connection code here
        self._is_connected = True
        print("Robot connected!")
    
    def disconnect(self) -> None:
        """Disconnect from the robot."""
        if self._is_connected:
            print("Disconnecting robot...")
            # Your disconnection code here
            self._is_connected = False
    
    def get_observation(self) -> RobotObservation:
        """Get current observation from robot."""
        if not self.is_connected:
            raise RuntimeError("Robot not connected")
        
        # Read sensor data
        joint_positions = self._read_joint_positions()
        camera_images = self._capture_images()
        
        return RobotObservation(
            state=joint_positions,
            pixels=camera_images
        )
    
    def send_action(self, action: RobotAction) -> None:
        """Send action command to robot."""
        if not self.is_connected:
            raise RuntimeError("Robot not connected")
        
        # Extract action values
        joint_velocities = action.action
        
        # Send commands to hardware
        self._write_joint_velocities(joint_velocities)
    
    # Helper methods (implement based on your hardware)
    def _read_joint_positions(self):
        # Read from your hardware
        pass
    
    def _capture_images(self):
        # Capture from cameras
        pass
    
    def _write_joint_velocities(self, velocities):
        # Write to your hardware
        pass

Complete Example: Serial Robot

Here’s a complete example for a robot with serial communication:
import numpy as np
import serial
import cv2
from lerobot.robots.robot import Robot
from lerobot.robots.config import RobotConfig
from lerobot.processor import RobotObservation, RobotAction
from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig
from dataclasses import dataclass

@dataclass
class SerialRobotConfig(RobotConfig):
    """Configuration for serial-controlled robot."""
    port: str = "/dev/ttyUSB0"
    baud_rate: int = 115200
    timeout: float = 1.0
    num_joints: int = 6
    cameras: dict[str, OpenCVCameraConfig] | None = None

class SerialRobot(Robot):
    """Robot controlled via serial communication."""
    
    name = "serial_robot"
    config_class = SerialRobotConfig
    
    def __init__(self, config: SerialRobotConfig):
        super().__init__(config)
        self.port = config.port
        self.baud_rate = config.baud_rate
        self.timeout = config.timeout
        self.num_joints = config.num_joints
        
        self.serial_connection = None
        self.cameras = {}
        
        # Initialize cameras if configured
        if config.cameras:
            for name, cam_config in config.cameras.items():
                self.cameras[name] = OpenCVCamera(cam_config)
    
    @property
    def observation_features(self) -> dict:
        features = {
            "state": (self.num_joints,),
        }
        
        # Add camera features
        if self.cameras:
            features["pixels"] = {}
            for name, camera in self.cameras.items():
                features["pixels"][name] = (
                    camera.config.height,
                    camera.config.width,
                    3  # RGB
                )
        
        return features
    
    @property
    def action_features(self) -> dict:
        return {
            "action": (self.num_joints,),
        }
    
    @property
    def is_connected(self) -> bool:
        return self.serial_connection is not None and self.serial_connection.is_open
    
    def connect(self) -> None:
        """Connect to robot via serial port."""
        print(f"Connecting to {self.port} at {self.baud_rate} baud...")
        
        try:
            self.serial_connection = serial.Serial(
                port=self.port,
                baudrate=self.baud_rate,
                timeout=self.timeout
            )
            print("Serial connection established.")
            
            # Connect cameras
            for name, camera in self.cameras.items():
                camera.connect()
                print(f"Camera '{name}' connected.")
            
            # Wait for robot to initialize
            time.sleep(2)
            
            # Send initialization command
            self._send_command("INIT")
            
        except serial.SerialException as e:
            raise RuntimeError(f"Failed to connect: {e}")
    
    def disconnect(self) -> None:
        """Disconnect from robot."""
        if self.serial_connection:
            self._send_command("STOP")
            self.serial_connection.close()
            print("Serial connection closed.")
        
        # Disconnect cameras
        for camera in self.cameras.values():
            camera.disconnect()
    
    def get_observation(self) -> RobotObservation:
        """Read current state and images."""
        if not self.is_connected:
            raise RuntimeError("Robot not connected")
        
        # Read joint positions
        self._send_command("GET_STATE")
        response = self._read_response()
        joint_positions = np.array([float(x) for x in response.split(",")])
        
        # Capture images from all cameras
        images = {}
        for name, camera in self.cameras.items():
            frame = camera.read()
            images[name] = frame
        
        return RobotObservation(
            state=joint_positions,
            pixels=images if images else None
        )
    
    def send_action(self, action: RobotAction) -> None:
        """Send action command to robot."""
        if not self.is_connected:
            raise RuntimeError("Robot not connected")
        
        # Format action as comma-separated values
        action_str = ",".join([f"{x:.6f}" for x in action.action])
        command = f"SET_ACTION,{action_str}"
        
        self._send_command(command)
    
    def _send_command(self, command: str) -> None:
        """Send command over serial."""
        self.serial_connection.write(f"{command}\n".encode())
        self.serial_connection.flush()
    
    def _read_response(self) -> str:
        """Read response from serial."""
        response = self.serial_connection.readline().decode().strip()
        return response

Using Your Robot

Data Collection

Once implemented, use your robot for teleoperation and data collection:
lerobot-record \
  --robot.type=serial_robot \
  --robot.port=/dev/ttyUSB0 \
  --robot.baud_rate=115200 \
  --teleop.type=keyboard \
  --repo-id=your_username/serial_robot_dataset \
  --num-episodes=50

Training

Train policies on data collected from your robot:
lerobot-train \
  --policy.type=diffusion \
  --dataset.repo_id=your_username/serial_robot_dataset \
  --steps=50000

Deployment

Deploy trained policies on your robot:
from lerobot.robots.serial_robot import SerialRobot, SerialRobotConfig
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.cameras.opencv import OpenCVCameraConfig
import torch

# Configure robot
camera_config = {
    "side": OpenCVCameraConfig(index_or_path=0, width=640, height=480),
}

robot_cfg = SerialRobotConfig(
    port="/dev/ttyUSB0",
    cameras=camera_config
)

# Load trained policy
policy = DiffusionPolicy.from_pretrained("your_username/serial_robot_policy")
policy.eval()
policy.to("cuda")

# Connect to robot
robot = SerialRobot(robot_cfg)
robot.connect()

try:
    for step in range(100):
        # Get observation
        obs = robot.get_observation()
        
        # Get action from policy
        action = policy.select_action(obs)
        
        # Execute action
        robot.send_action(action)
        
finally:
    robot.disconnect()

Camera Integration

LeRobot supports multiple camera backends:

OpenCV Cameras

from lerobot.cameras.opencv import OpenCVCamera, OpenCVCameraConfig

camera_config = OpenCVCameraConfig(
    index_or_path=0,  # Camera index or device path
    width=640,
    height=480,
    fps=30
)

camera = OpenCVCamera(camera_config)
camera.connect()

# Read frame
frame = camera.read()  # Returns numpy array [H, W, 3]

camera.disconnect()

Multiple Cameras

cameras = {
    "side": OpenCVCamera(OpenCVCameraConfig(index_or_path=0)),
    "wrist": OpenCVCamera(OpenCVCameraConfig(index_or_path=1)),
    "top": OpenCVCamera(OpenCVCameraConfig(index_or_path=2)),
}

# Connect all
for camera in cameras.values():
    camera.connect()

# Read all
images = {name: cam.read() for name, cam in cameras.items()}

Teleoperation Devices

Implement custom teleoperation devices:
from lerobot.teleoperators.config import TeleoperatorConfig
from lerobot.teleoperators.base import Teleoperator
from dataclasses import dataclass

@dataclass
class MyTeleopConfig(TeleoperatorConfig):
    device_id: str = "/dev/input/js0"
    sensitivity: float = 1.0

class MyTeleoperator(Teleoperator):
    """Custom teleoperation device."""
    
    name = "my_teleop"
    config_class = MyTeleopConfig
    
    def __init__(self, config: MyTeleopConfig):
        super().__init__(config)
        self.device_id = config.device_id
        self.sensitivity = config.sensitivity
    
    def connect(self) -> None:
        # Connect to device
        pass
    
    def disconnect(self) -> None:
        # Disconnect from device
        pass
    
    def get_action(self) -> RobotAction:
        """Read action from teleoperation device."""
        # Read from device and convert to action
        pass

Motor Calibration

LeRobot provides tools for motor calibration:
# Run calibration
lerobot-calibrate \
  --robot.type=serial_robot \
  --robot.port=/dev/ttyUSB0 \
  --robot.id=my_robot_001
Calibration files are saved to ~/.cache/lerobot/calibration/robots/serial_robot/my_robot_001.json

Best Practices

1
Define clear observation and action spaces
2
Specify the exact structure of observations and actions:
3
@property
def observation_features(self) -> dict:
    return {
        "state": (7,),  # Joint positions
        "pixels": {
            "side": (480, 640, 3),
        }
    }
4
Implement proper connection handling
5
Use context managers for safe resource management:
6
with robot:
    # Robot automatically connects
    obs = robot.get_observation()
    robot.send_action(action)
# Robot automatically disconnects
7
Add error handling
8
Handle hardware failures gracefully:
9
def get_observation(self) -> RobotObservation:
    try:
        state = self._read_state()
    except Exception as e:
        logging.error(f"Failed to read state: {e}")
        # Return safe default or raise
        raise
10
Test incrementally
11
Test each component independently:
12
# Test connection
robot.connect()
assert robot.is_connected

# Test observation
obs = robot.get_observation()
assert obs.state.shape == (7,)

# Test action
action = RobotAction(action=np.zeros(7))
robot.send_action(action)

Supported Hardware

LeRobot includes implementations for:
  • SO-100: LeRobot’s reference robot arm
  • Koch: Low-cost robot arm
  • LeKiwi: Mobile manipulation platform
  • Reachy 2: Humanoid robot
  • OpenARM: Open-source robot arm
  • Unitree G1: Humanoid robot
  • EarthRover: Mobile robot
See the robots documentation for setup guides.

Next Steps

Example Implementations

Study existing robot implementations:

Build docs developers (and LLMs) love