Skip to main content
LeRobot provides gRPC-based transport utilities for distributed robot learning, enabling policy servers and robot clients to communicate over networks.

Overview

The transport utilities enable:
  • Remote policy inference (policy server + robot client)
  • Distributed training data collection
  • Cloud-based policy deployment
  • Multi-robot coordination

Core Functions

Location: src/lerobot/transport/utils.py

send_bytes_in_chunks

Stream large data payloads in chunks over gRPC.
from lerobot.transport.utils import send_bytes_in_chunks
from lerobot.transport import services_pb2

data = b"large binary data..."

for chunk in send_bytes_in_chunks(
    buffer=data,
    message_class=services_pb2.DataMessage,
    log_prefix="Sending policy weights",
):
    yield chunk
buffer
bytes
required
Binary data to send.
message_class
Any
required
Protobuf message class to use.
log_prefix
str
default:""
Prefix for log messages.
silent
bool
default:"True"
If True, log at debug level instead of info.

receive_bytes_in_chunks

Receive chunked data from gRPC stream.
from lerobot.transport.utils import receive_bytes_in_chunks
from multiprocessing import Event

shutdown_event = Event()

data = receive_bytes_in_chunks(
    iterator=grpc_stream,
    queue=None,  # Or provide Queue for async
    shutdown_event=shutdown_event,
    log_prefix="Receiving",
)
iterator
Iterator
required
gRPC stream iterator.
queue
Queue | None
Optional queue for async reception.
shutdown_event
Event
required
Event to signal shutdown.
log_prefix
str
default:""
Prefix for log messages.

State Serialization

state_to_bytes

Serialize PyTorch model state dict to bytes.
from lerobot.transport.utils import state_to_bytes
import torch

state_dict = model.state_dict()
data = state_to_bytes(state_dict)

bytes_to_state_dict

Deserialize bytes to PyTorch state dict.
from lerobot.transport.utils import bytes_to_state_dict

state_dict = bytes_to_state_dict(data)
model.load_state_dict(state_dict)

Transition Serialization

transitions_to_bytes

Serialize transitions for distributed training.
from lerobot.transport.utils import transitions_to_bytes

transitions = [transition1, transition2, ...]
data = transitions_to_bytes(transitions)

bytes_to_transitions

Deserialize transitions from bytes.
from lerobot.transport.utils import bytes_to_transitions

transitions = bytes_to_transitions(data)

Python Object Serialization

python_object_to_bytes

Serialize arbitrary Python objects.
from lerobot.transport.utils import python_object_to_bytes

obj = {"config": {...}, "stats": {...}}
data = python_object_to_bytes(obj)

bytes_to_python_object

Deserialize Python objects.
from lerobot.transport.utils import bytes_to_python_object

obj = bytes_to_python_object(data)

gRPC Configuration

grpc_channel_options

Get optimized gRPC channel options.
from lerobot.transport.utils import grpc_channel_options
import grpc

options = grpc_channel_options(
    max_receive_message_length=4 * 1024 * 1024,  # 4 MB
    max_send_message_length=4 * 1024 * 1024,
    enable_retries=True,
    max_attempts=5,
)

channel = grpc.insecure_channel(
    "localhost:50051",
    options=options,
)
max_receive_message_length
int
default:"4MB"
Maximum message size to receive.
max_send_message_length
int
default:"4MB"
Maximum message size to send.
enable_retries
bool
default:"True"
Enable automatic retries on network failures.
max_attempts
int
default:"5"
Maximum retry attempts.
initial_backoff
str
default:"0.1s"
Initial backoff delay.
max_backoff
str
default:"2s"
Maximum backoff delay.

Policy Server

Run a policy inference server.
from lerobot.async_inference.policy_server import run_policy_server
from lerobot.policies import make_policy

policy = make_policy(
    cfg=None,
    pretrained_path="lerobot/diffusion_pusht",
)

run_policy_server(
    policy=policy,
    port=50051,
    num_inference_threads=4,
)
Location: src/lerobot/async_inference/policy_server.py

Robot Client

Connect to remote policy server.
from lerobot.async_inference.robot_client import RobotPolicyClient
from lerobot.robots import make_robot_from_config

robot = make_robot_from_config(robot_config)
client = RobotPolicyClient(
    server_address="192.168.1.100:50051",
)

robot.connect()
client.connect()

# Get action from remote policy
observation = robot.get_observation()
action = client.get_action(observation)
robot.send_action(action)

client.disconnect()
robot.disconnect()
Location: src/lerobot/async_inference/robot_client.py

Usage Examples

Remote Policy Inference

# On server machine
from lerobot.async_inference.policy_server import run_policy_server
from lerobot.policies import make_policy

policy = make_policy(cfg=None, pretrained_path="lerobot/act_aloha")

run_policy_server(
    policy=policy,
    port=50051,
    num_inference_threads=4,
)
# On robot machine
from lerobot.async_inference.robot_client import RobotPolicyClient
from lerobot.robots import make_robot_from_config

robot = make_robot_from_config(robot_config)
client = RobotPolicyClient(
    server_address="server.example.com:50051",
)

robot.connect()
client.connect()

try:
    for step in range(1000):
        obs = robot.get_observation()
        action = client.get_action(obs)
        robot.send_action(action)
finally:
    client.disconnect()
    robot.disconnect()

Distributed Training Data Collection

# On robot machines
from lerobot.async_inference.robot_client import RobotDataCollector

collector = RobotDataCollector(
    server_address="training-server:50051",
    robot_config=robot_config,
)

collector.run(
    num_episodes=100,
    episode_length=200,
)
# On training server
from lerobot.async_inference.data_aggregator import DataAggregator

aggregator = DataAggregator(
    port=50051,
    dataset_repo_id="myuser/collected_data",
)

aggregator.start()
# Receives data from multiple robots

Custom gRPC Service

import grpc
from concurrent import futures
from lerobot.transport.utils import (
    grpc_channel_options,
    send_bytes_in_chunks,
    receive_bytes_in_chunks,
)
from lerobot.transport import services_pb2, services_pb2_grpc

class MyServicer(services_pb2_grpc.MyServiceServicer):
    def SendData(self, request_iterator, context):
        data = receive_bytes_in_chunks(
            request_iterator,
            queue=None,
            shutdown_event=shutdown_event,
        )
        # Process data
        return services_pb2.Response(success=True)

# Start server
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    options=grpc_channel_options(),
)
services_pb2_grpc.add_MyServiceServicer_to_server(
    MyServicer(),
    server,
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()

Best Practices

  1. Message Size: Keep messages under 4MB for best performance
  2. Retries: Enable retries for unreliable networks
  3. Timeouts: Set appropriate timeouts for operations
  4. Compression: Use chunking for large payloads
  5. Error Handling: Handle network errors gracefully

Security Considerations

  1. Encryption: Use TLS for production deployments
  2. Authentication: Implement authentication for policy servers
  3. Validation: Validate all received data
  4. Network: Use firewalls and VPNs for remote connections

See Also

Build docs developers (and LLMs) love