LeRobot provides utilities for processing, transforming, and manipulating robot learning data.
Dataset Statistics
compute_stats
Compute normalization statistics for a dataset.
from lerobot.datasets.compute_stats import compute_stats
from lerobot.datasets import LeRobotDataset
dataset = LeRobotDataset("lerobot/pusht")
stats = compute_stats(
dataset,
batch_size=32,
num_workers=4,
)
print(stats["action"]["mean"]) # Mean of actions
print(stats["action"]["std"]) # Std dev of actions
Location: src/lerobot/datasets/compute_stats.py
Dataset to compute statistics for.
Batch size for data loading.
Number of dataloader workers.
Dictionary mapping feature names to {"mean": array, "std": array, "min": array, "max": array}.
aggregate_stats
Aggregate statistics from multiple datasets or episodes.
from lerobot.datasets.compute_stats import aggregate_stats
# Combine stats from two datasets
combined_stats = aggregate_stats([stats1, stats2])
Transform Hugging Face dataset outputs to PyTorch tensors.
from lerobot.datasets.utils import hf_transform_to_torch
# Applied automatically by LeRobotDataset
dataset.hf_dataset.set_transform(hf_transform_to_torch)
Location: src/lerobot/datasets/utils.py:hf_transform_to_torch
flatten_dict
Flatten nested dictionary with dot-separated keys.
from lerobot.datasets.utils import flatten_dict
nested = {
"observation": {
"images": {
"top": [1, 2, 3],
},
"state": [4, 5, 6],
},
}
flat = flatten_dict(nested)
# {
# "observation.images.top": [1, 2, 3],
# "observation.state": [4, 5, 6],
# }
Video Processing
encode_video_frames
Encode image frames to video file.
from lerobot.datasets.video_utils import encode_video_frames
from pathlib import Path
encode_video_frames(
img_dir=Path("./frames"), # Directory with frame*.png
video_path=Path("./output.mp4"),
fps=30,
vcodec="libsvtav1",
overwrite=True,
)
Location: src/lerobot/datasets/video_utils.py:encode_video_frames
Directory containing image files.
decode_video_frames
Decode specific frames from video file.
from lerobot.datasets.video_utils import decode_video_frames
from pathlib import Path
frames = decode_video_frames(
video_path=Path("./video.mp4"),
timestamps=[0.0, 0.033, 0.066], # Seconds
tolerance_s=1e-4,
backend="pyav",
)
print(frames.shape) # (3, C, H, W)
List of timestamps in seconds.
Tolerance for timestamp matching.
Video backend: pyav, torchcodec, video_reader.
Tensor of shape (N, C, H, W) with decoded frames.
Episode Processing
validate_episode_buffer
Validate episode data before saving.
from lerobot.datasets.utils import validate_episode_buffer
episode_buffer = {
"size": 100,
"task": ["pick cube"] * 100,
"action": [[0.1, 0.2]] * 100,
# ... more keys ...
}
validate_episode_buffer(episode_buffer, features)
# Raises error if invalid
validate_frame
Validate single frame data.
from lerobot.datasets.utils import validate_frame
frame = {
"observation.state": [0.1, 0.2, 0.3],
"action": [0.5, 0.6],
"task": "pick cube",
}
validate_frame(frame, features)
# Raises error if invalid
Delta Timestamps
get_delta_indices
Convert delta timestamps to frame indices.
from lerobot.datasets.utils import get_delta_indices
delta_timestamps = {
"observation.images.top": [-0.033, 0.0],
"action": [0.0, 0.033, 0.066],
}
fps = 30
delta_indices = get_delta_indices(delta_timestamps, fps)
# {
# "observation.images.top": [-1, 0],
# "action": [0, 1, 2],
# }
check_delta_timestamps
Validate delta timestamps against FPS.
from lerobot.datasets.utils import check_delta_timestamps
check_delta_timestamps(
delta_timestamps,
fps=30,
tolerance_s=1e-4,
)
# Raises error if timestamps aren't multiples of 1/fps
Image Processing
write_image
Write image array to file.
from lerobot.datasets.image_writer import write_image
import numpy as np
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
write_image(
image,
Path("./output.png"),
compress_level=1,
)
AsyncImageWriter
Write images asynchronously for better performance.
from lerobot.datasets.image_writer import AsyncImageWriter
import numpy as np
writer = AsyncImageWriter(
num_processes=0,
num_threads=4,
)
for i in range(100):
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
writer.save_image(
image,
Path(f"./frame_{i:04d}.png"),
compress_level=1,
)
# Wait for all writes to complete
writer.close()
Hub Utilities
push_dataset_to_hub
Upload dataset to Hugging Face Hub.
from lerobot.datasets import LeRobotDataset
dataset = LeRobotDataset(
"myuser/my_dataset",
root="./local_dataset",
)
dataset.push_to_hub(
tags=["robotics", "manipulation"],
license="apache-2.0",
push_videos=True,
private=False,
)
Example Workflows
Compute and Save Statistics
from lerobot.datasets import LeRobotDataset
from lerobot.datasets.compute_stats import compute_stats
from lerobot.datasets.utils import write_stats
dataset = LeRobotDataset("myuser/my_dataset")
# Compute stats
stats = compute_stats(
dataset,
batch_size=64,
num_workers=8,
)
# Save to dataset
write_stats(stats, dataset.root)
# Reload dataset to use stats
dataset = LeRobotDataset("myuser/my_dataset")
print(dataset.meta.stats)
Process Videos in Batch
from lerobot.datasets.video_utils import encode_video_frames
from pathlib import Path
import concurrent.futures
def encode_episode(episode_idx, image_dir, output_dir, fps):
encode_video_frames(
img_dir=image_dir / f"episode_{episode_idx:03d}",
video_path=output_dir / f"episode_{episode_idx:03d}.mp4",
fps=fps,
vcodec="h264",
)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(
encode_episode,
i,
Path("./frames"),
Path("./videos"),
30,
)
for i in range(10)
]
concurrent.futures.wait(futures)
from lerobot.datasets import LeRobotDataset
import torch
dataset = LeRobotDataset("lerobot/pusht")
def custom_transform(batch):
# Convert to tensors
batch = {k: torch.tensor(v) for k, v in batch.items()}
# Add noise to actions for data augmentation
batch["action"] = batch["action"] + torch.randn_like(batch["action"]) * 0.01
# Normalize observations
batch["observation.state"] = (
batch["observation.state"] - dataset.meta.stats["observation.state"]["mean"]
) / dataset.meta.stats["observation.state"]["std"]
return batch
dataset.hf_dataset.set_transform(custom_transform)
See Also