Dataset Format Requirements
LeRobotDataset expects:- Parquet files for tabular data (observations, actions, states)
- MP4 files for video observations
- JSON files for metadata (info, stats, tasks)
- Organized in a specific directory structure
Creating a New Dataset
Basic Setup
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from pathlib import Path
import numpy as np
# Define features for your dataset
features = {
"observation.state": {
"dtype": "float32",
"shape": [14],
"names": [
"joint_0", "joint_1", "joint_2", "joint_3",
"joint_4", "joint_5", "joint_6",
"gripper",
] * 2 # Assuming bi-manual robot
},
"observation.images.top": {
"dtype": "video",
"shape": [3, 480, 640],
},
"observation.images.wrist": {
"dtype": "video",
"shape": [3, 480, 640],
},
"action": {
"dtype": "float32",
"shape": [14],
"names": None # Optional: same as observation.state
},
}
# Create empty dataset
dataset = LeRobotDataset.create(
repo_id="username/my-robot-dataset",
fps=30,
features=features,
robot_type="aloha", # Optional
root="./data/my-dataset",
use_videos=True,
)
print(f"Created dataset at {dataset.root}")
Recording Episodes
# Start recording episode 0
episode_index = 0
episode_buffer = dataset.create_episode_buffer(episode_index)
# Collect data in a loop
for frame_idx in range(num_frames):
# Get data from your robot/simulator
state = robot.get_state() # numpy array [14]
top_img = robot.get_camera("top") # numpy array [480, 640, 3]
wrist_img = robot.get_camera("wrist") # numpy array [480, 640, 3]
action = robot.get_action() # numpy array [14]
# Create frame dictionary
frame = {
"observation.state": state,
"observation.images.top": top_img,
"observation.images.wrist": wrist_img,
"action": action,
"task": "Pick up the cup",
}
# Add frame to buffer
dataset.add_frame(frame)
# Save the episode
dataset.save_episode(
task="Pick up the cup",
encode_videos=True
)
print(f"Saved episode {episode_index} with {num_frames} frames")
Recording Multiple Episodes
from lerobot.datasets.video_utils import VideoEncodingManager
# Use context manager for safe video encoding
with VideoEncodingManager(dataset):
for ep_idx in range(num_episodes):
episode_buffer = dataset.create_episode_buffer()
# Reset environment
robot.reset()
# Collect episode
for t in range(max_steps):
state = robot.get_state()
top_img = robot.get_camera("top")
action = policy.predict(state, top_img)
frame = {
"observation.state": state,
"observation.images.top": top_img,
"action": action,
"task": "Navigate to target",
}
dataset.add_frame(frame)
robot.step(action)
if done:
break
# Save episode
dataset.save_episode(
task="Navigate to target",
encode_videos=True
)
print(f"Completed episode {ep_idx+1}/{num_episodes}")
# Finalize dataset (important!)
dataset.finalize()
Batch Video Encoding
For large datasets, batch encoding is more efficient:# Create dataset with batch encoding
dataset = LeRobotDataset.create(
repo_id="username/my-large-dataset",
fps=30,
features=features,
root="./data/my-large-dataset",
use_videos=True,
)
# Set batch size for encoding
dataset.batch_encoding_size = 10 # Encode every 10 episodes
with VideoEncodingManager(dataset):
for ep_idx in range(100): # 100 episodes
episode_buffer = dataset.create_episode_buffer()
# Collect data...
for t in range(max_steps):
frame = {...}
dataset.add_frame(frame)
# Videos will be encoded in batches of 10
dataset.save_episode(task="My task", encode_videos=True)
dataset.finalize()
Streaming Video Encoding
For real-time encoding during recording (recommended for large datasets):# Create dataset with streaming encoding
dataset = LeRobotDataset.create(
repo_id="username/my-streaming-dataset",
fps=30,
features=features,
root="./data/my-streaming-dataset",
use_videos=True,
)
# Enable streaming encoding
dataset = LeRobotDataset(
"username/my-streaming-dataset",
root="./data/my-streaming-dataset",
streaming_encoding=True,
vcodec="auto", # Auto-detect best hardware encoder
encoder_queue_maxsize=30, # Buffer size per camera
)
with VideoEncodingManager(dataset):
for ep_idx in range(num_episodes):
episode_buffer = dataset.create_episode_buffer()
for t in range(max_steps):
frame = {...}
dataset.add_frame(frame) # Frames encoded in real-time!
# save_episode is near-instant with streaming encoding
dataset.save_episode(task="My task", encode_videos=True)
dataset.finalize()
Converting Existing Datasets
From Custom Format
import h5py
import numpy as np
from pathlib import Path
from PIL import Image
def convert_hdf5_to_lerobot(hdf5_path: Path, output_repo_id: str):
"""Convert HDF5 dataset to LeRobotDataset format."""
# Load HDF5 file
with h5py.File(hdf5_path, 'r') as f:
num_episodes = len(f['episodes'])
# Infer features from first episode
ep0 = f['episodes']['episode_0']
obs_shape = ep0['observations']['state'].shape[1:]
action_shape = ep0['actions'].shape[1:]
features = {
"observation.state": {
"dtype": "float32",
"shape": list(obs_shape),
},
"action": {
"dtype": "float32",
"shape": list(action_shape),
},
}
# Create LeRobot dataset
dataset = LeRobotDataset.create(
repo_id=output_repo_id,
fps=30, # Adjust based on your data
features=features,
use_videos=False, # Add video support if needed
)
with VideoEncodingManager(dataset):
for ep_idx in range(num_episodes):
ep_data = f['episodes'][f'episode_{ep_idx}']
states = ep_data['observations']['state'][:]
actions = ep_data['actions'][:]
task = ep_data.attrs.get('task', 'Unknown task')
episode_buffer = dataset.create_episode_buffer()
for t in range(len(states)):
frame = {
"observation.state": states[t],
"action": actions[t],
"task": task,
}
dataset.add_frame(frame)
dataset.save_episode(task=task)
print(f"Converted episode {ep_idx+1}/{num_episodes}")
dataset.finalize()
return dataset
# Usage
dataset = convert_hdf5_to_lerobot(
"data/my_dataset.hdf5",
"username/converted-dataset"
)
From RLDS Format
import tensorflow_datasets as tfds
def convert_rlds_to_lerobot(rlds_name: str, output_repo_id: str):
"""Convert RLDS dataset to LeRobotDataset format."""
# Load RLDS dataset
ds = tfds.load(rlds_name, split='train')
# Infer features from first episode
first_ep = next(iter(ds))
first_step = first_ep['steps'][0]
features = {
"observation.state": {
"dtype": "float32",
"shape": list(first_step['observation']['state'].shape),
},
"action": {
"dtype": "float32",
"shape": list(first_step['action'].shape),
},
}
# Add image features if present
if 'image' in first_step['observation']:
img_shape = first_step['observation']['image'].shape
features["observation.images.primary"] = {
"dtype": "video",
"shape": [3, img_shape[0], img_shape[1]],
}
# Create dataset
dataset = LeRobotDataset.create(
repo_id=output_repo_id,
fps=10, # Adjust based on RLDS metadata
features=features,
use_videos="observation.images.primary" in features,
)
with VideoEncodingManager(dataset):
for ep_idx, episode in enumerate(ds):
task = episode['task_description'].numpy().decode('utf-8')
episode_buffer = dataset.create_episode_buffer()
for step in episode['steps']:
frame = {
"observation.state": step['observation']['state'].numpy(),
"action": step['action'].numpy(),
"task": task,
}
if 'image' in step['observation']:
# Convert from HWC to CHW
img = step['observation']['image'].numpy()
frame["observation.images.primary"] = img.transpose(2, 0, 1)
dataset.add_frame(frame)
dataset.save_episode(task=task, encode_videos=True)
print(f"Converted episode {ep_idx+1}")
dataset.finalize()
return dataset
Optimizing for Large Datasets
Chunk Size Configuration
from lerobot.datasets.lerobot_dataset import LeRobotDatasetMetadata
# Create dataset with custom chunk sizes
meta = LeRobotDatasetMetadata.create(
repo_id="username/large-dataset",
fps=30,
features=features,
)
# Optimize chunk settings for your dataset size
meta.update_chunk_settings(
chunks_size=2000, # More files per chunk
data_files_size_in_mb=150, # Larger data files
video_files_size_in_mb=300, # Larger video files
)
Parallel Processing
from concurrent.futures import ThreadPoolExecutor
import threading
class ThreadSafeDataset:
def __init__(self, dataset):
self.dataset = dataset
self.lock = threading.Lock()
def add_episode(self, episode_data):
with self.lock:
episode_buffer = self.dataset.create_episode_buffer()
for frame in episode_data:
self.dataset.add_frame(frame)
self.dataset.save_episode(
task=episode_data[0]['task'],
encode_videos=True
)
# Convert multiple episodes in parallel
dataset = LeRobotDataset.create(...)
thread_safe_ds = ThreadSafeDataset(dataset)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for episode_data in episode_iterator:
future = executor.submit(thread_safe_ds.add_episode, episode_data)
futures.append(future)
for future in futures:
future.result()
dataset.finalize()
Validation
# Validate the converted dataset
from lerobot.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset("username/converted-dataset")
print(f"Total episodes: {dataset.num_episodes}")
print(f"Total frames: {dataset.num_frames}")
print(f"FPS: {dataset.fps}")
print(f"Features: {list(dataset.features.keys())}")
# Check a sample
sample = dataset[0]
print(f"Sample keys: {sample.keys()}")
for key, value in sample.items():
if hasattr(value, 'shape'):
print(f" {key}: shape={value.shape}, dtype={value.dtype}")
else:
print(f" {key}: {value}")
# Verify videos if present
if len(dataset.meta.video_keys) > 0:
print(f"\nVideo keys: {dataset.meta.video_keys}")
for vid_key in dataset.meta.video_keys:
print(f" {vid_key}: {dataset.meta.features[vid_key]}")
Pushing to Hub
# Push dataset to Hugging Face Hub
dataset.push_to_hub(
private=False, # Set to True for private datasets
push_videos=True,
license="apache-2.0",
tags=["robotics", "manipulation", "aloha"],
)
print(f"Dataset pushed to https://huggingface.co/datasets/{dataset.repo_id}")
See Also
- Using LeRobotDataset - Loading and using datasets
- Video Encoding - Advanced video encoding options
- Dataset Tools - Tools for manipulating datasets