Skip to main content
LeRobot provides utilities for processing, transforming, and manipulating robot learning data.

Dataset Statistics

compute_stats

Compute normalization statistics for a dataset.
from lerobot.datasets.compute_stats import compute_stats
from lerobot.datasets import LeRobotDataset

dataset = LeRobotDataset("lerobot/pusht")

stats = compute_stats(
    dataset,
    batch_size=32,
    num_workers=4,
)

print(stats["action"]["mean"])  # Mean of actions
print(stats["action"]["std"])   # Std dev of actions
Location: src/lerobot/datasets/compute_stats.py
dataset
LeRobotDataset
required
Dataset to compute statistics for.
batch_size
int
default:"32"
Batch size for data loading.
num_workers
int
default:"4"
Number of dataloader workers.
stats
dict
Dictionary mapping feature names to {"mean": array, "std": array, "min": array, "max": array}.

aggregate_stats

Aggregate statistics from multiple datasets or episodes.
from lerobot.datasets.compute_stats import aggregate_stats

# Combine stats from two datasets
combined_stats = aggregate_stats([stats1, stats2])

Data Transformation

hf_transform_to_torch

Transform Hugging Face dataset outputs to PyTorch tensors.
from lerobot.datasets.utils import hf_transform_to_torch

# Applied automatically by LeRobotDataset
dataset.hf_dataset.set_transform(hf_transform_to_torch)
Location: src/lerobot/datasets/utils.py:hf_transform_to_torch

flatten_dict

Flatten nested dictionary with dot-separated keys.
from lerobot.datasets.utils import flatten_dict

nested = {
    "observation": {
        "images": {
            "top": [1, 2, 3],
        },
        "state": [4, 5, 6],
    },
}

flat = flatten_dict(nested)
# {
#     "observation.images.top": [1, 2, 3],
#     "observation.state": [4, 5, 6],
# }

Video Processing

encode_video_frames

Encode image frames to video file.
from lerobot.datasets.video_utils import encode_video_frames
from pathlib import Path

encode_video_frames(
    img_dir=Path("./frames"),  # Directory with frame*.png
    video_path=Path("./output.mp4"),
    fps=30,
    vcodec="libsvtav1",
    overwrite=True,
)
Location: src/lerobot/datasets/video_utils.py:encode_video_frames
img_dir
Path
required
Directory containing image files.
video_path
Path
required
Output video file path.
fps
int
required
Frames per second.
vcodec
str
default:"libsvtav1"
Video codec.

decode_video_frames

Decode specific frames from video file.
from lerobot.datasets.video_utils import decode_video_frames
from pathlib import Path

frames = decode_video_frames(
    video_path=Path("./video.mp4"),
    timestamps=[0.0, 0.033, 0.066],  # Seconds
    tolerance_s=1e-4,
    backend="pyav",
)

print(frames.shape)  # (3, C, H, W)
video_path
Path
required
Video file path.
timestamps
list[float]
required
List of timestamps in seconds.
tolerance_s
float
default:"1e-4"
Tolerance for timestamp matching.
backend
str
default:"pyav"
Video backend: pyav, torchcodec, video_reader.
frames
torch.Tensor
Tensor of shape (N, C, H, W) with decoded frames.

Episode Processing

validate_episode_buffer

Validate episode data before saving.
from lerobot.datasets.utils import validate_episode_buffer

episode_buffer = {
    "size": 100,
    "task": ["pick cube"] * 100,
    "action": [[0.1, 0.2]] * 100,
    # ... more keys ...
}

validate_episode_buffer(episode_buffer, features)
# Raises error if invalid

validate_frame

Validate single frame data.
from lerobot.datasets.utils import validate_frame

frame = {
    "observation.state": [0.1, 0.2, 0.3],
    "action": [0.5, 0.6],
    "task": "pick cube",
}

validate_frame(frame, features)
# Raises error if invalid

Delta Timestamps

get_delta_indices

Convert delta timestamps to frame indices.
from lerobot.datasets.utils import get_delta_indices

delta_timestamps = {
    "observation.images.top": [-0.033, 0.0],
    "action": [0.0, 0.033, 0.066],
}

fps = 30

delta_indices = get_delta_indices(delta_timestamps, fps)
# {
#     "observation.images.top": [-1, 0],
#     "action": [0, 1, 2],
# }

check_delta_timestamps

Validate delta timestamps against FPS.
from lerobot.datasets.utils import check_delta_timestamps

check_delta_timestamps(
    delta_timestamps,
    fps=30,
    tolerance_s=1e-4,
)
# Raises error if timestamps aren't multiples of 1/fps

Image Processing

write_image

Write image array to file.
from lerobot.datasets.image_writer import write_image
import numpy as np

image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)

write_image(
    image,
    Path("./output.png"),
    compress_level=1,
)

AsyncImageWriter

Write images asynchronously for better performance.
from lerobot.datasets.image_writer import AsyncImageWriter
import numpy as np

writer = AsyncImageWriter(
    num_processes=0,
    num_threads=4,
)

for i in range(100):
    image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
    writer.save_image(
        image,
        Path(f"./frame_{i:04d}.png"),
        compress_level=1,
    )

# Wait for all writes to complete
writer.close()

Hub Utilities

push_dataset_to_hub

Upload dataset to Hugging Face Hub.
from lerobot.datasets import LeRobotDataset

dataset = LeRobotDataset(
    "myuser/my_dataset",
    root="./local_dataset",
)

dataset.push_to_hub(
    tags=["robotics", "manipulation"],
    license="apache-2.0",
    push_videos=True,
    private=False,
)

Example Workflows

Compute and Save Statistics

from lerobot.datasets import LeRobotDataset
from lerobot.datasets.compute_stats import compute_stats
from lerobot.datasets.utils import write_stats

dataset = LeRobotDataset("myuser/my_dataset")

# Compute stats
stats = compute_stats(
    dataset,
    batch_size=64,
    num_workers=8,
)

# Save to dataset
write_stats(stats, dataset.root)

# Reload dataset to use stats
dataset = LeRobotDataset("myuser/my_dataset")
print(dataset.meta.stats)

Process Videos in Batch

from lerobot.datasets.video_utils import encode_video_frames
from pathlib import Path
import concurrent.futures

def encode_episode(episode_idx, image_dir, output_dir, fps):
    encode_video_frames(
        img_dir=image_dir / f"episode_{episode_idx:03d}",
        video_path=output_dir / f"episode_{episode_idx:03d}.mp4",
        fps=fps,
        vcodec="h264",
    )

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = [
        executor.submit(
            encode_episode,
            i,
            Path("./frames"),
            Path("./videos"),
            30,
        )
        for i in range(10)
    ]
    concurrent.futures.wait(futures)

Custom Data Transformation

from lerobot.datasets import LeRobotDataset
import torch

dataset = LeRobotDataset("lerobot/pusht")

def custom_transform(batch):
    # Convert to tensors
    batch = {k: torch.tensor(v) for k, v in batch.items()}
    
    # Add noise to actions for data augmentation
    batch["action"] = batch["action"] + torch.randn_like(batch["action"]) * 0.01
    
    # Normalize observations
    batch["observation.state"] = (
        batch["observation.state"] - dataset.meta.stats["observation.state"]["mean"]
    ) / dataset.meta.stats["observation.state"]["std"]
    
    return batch

dataset.hf_dataset.set_transform(custom_transform)

See Also

Build docs developers (and LLMs) love