LeRobot provides gRPC-based transport utilities for distributed robot learning, enabling policy servers and robot clients to communicate over networks.
Overview
The transport utilities enable:
- Remote policy inference (policy server + robot client)
- Distributed training data collection
- Cloud-based policy deployment
- Multi-robot coordination
Core Functions
Location: src/lerobot/transport/utils.py
send_bytes_in_chunks
Stream large data payloads in chunks over gRPC.
from lerobot.transport.utils import send_bytes_in_chunks
from lerobot.transport import services_pb2
data = b"large binary data..."
for chunk in send_bytes_in_chunks(
buffer=data,
message_class=services_pb2.DataMessage,
log_prefix="Sending policy weights",
):
yield chunk
Protobuf message class to use.
If True, log at debug level instead of info.
receive_bytes_in_chunks
Receive chunked data from gRPC stream.
from lerobot.transport.utils import receive_bytes_in_chunks
from multiprocessing import Event
shutdown_event = Event()
data = receive_bytes_in_chunks(
iterator=grpc_stream,
queue=None, # Or provide Queue for async
shutdown_event=shutdown_event,
log_prefix="Receiving",
)
Optional queue for async reception.
Event to signal shutdown.
State Serialization
state_to_bytes
Serialize PyTorch model state dict to bytes.
from lerobot.transport.utils import state_to_bytes
import torch
state_dict = model.state_dict()
data = state_to_bytes(state_dict)
bytes_to_state_dict
Deserialize bytes to PyTorch state dict.
from lerobot.transport.utils import bytes_to_state_dict
state_dict = bytes_to_state_dict(data)
model.load_state_dict(state_dict)
Transition Serialization
transitions_to_bytes
Serialize transitions for distributed training.
from lerobot.transport.utils import transitions_to_bytes
transitions = [transition1, transition2, ...]
data = transitions_to_bytes(transitions)
bytes_to_transitions
Deserialize transitions from bytes.
from lerobot.transport.utils import bytes_to_transitions
transitions = bytes_to_transitions(data)
Python Object Serialization
python_object_to_bytes
Serialize arbitrary Python objects.
from lerobot.transport.utils import python_object_to_bytes
obj = {"config": {...}, "stats": {...}}
data = python_object_to_bytes(obj)
bytes_to_python_object
Deserialize Python objects.
from lerobot.transport.utils import bytes_to_python_object
obj = bytes_to_python_object(data)
gRPC Configuration
grpc_channel_options
Get optimized gRPC channel options.
from lerobot.transport.utils import grpc_channel_options
import grpc
options = grpc_channel_options(
max_receive_message_length=4 * 1024 * 1024, # 4 MB
max_send_message_length=4 * 1024 * 1024,
enable_retries=True,
max_attempts=5,
)
channel = grpc.insecure_channel(
"localhost:50051",
options=options,
)
max_receive_message_length
Maximum message size to receive.
Maximum message size to send.
Enable automatic retries on network failures.
Policy Server
Run a policy inference server.
from lerobot.async_inference.policy_server import run_policy_server
from lerobot.policies import make_policy
policy = make_policy(
cfg=None,
pretrained_path="lerobot/diffusion_pusht",
)
run_policy_server(
policy=policy,
port=50051,
num_inference_threads=4,
)
Location: src/lerobot/async_inference/policy_server.py
Robot Client
Connect to remote policy server.
from lerobot.async_inference.robot_client import RobotPolicyClient
from lerobot.robots import make_robot_from_config
robot = make_robot_from_config(robot_config)
client = RobotPolicyClient(
server_address="192.168.1.100:50051",
)
robot.connect()
client.connect()
# Get action from remote policy
observation = robot.get_observation()
action = client.get_action(observation)
robot.send_action(action)
client.disconnect()
robot.disconnect()
Location: src/lerobot/async_inference/robot_client.py
Usage Examples
Remote Policy Inference
# On server machine
from lerobot.async_inference.policy_server import run_policy_server
from lerobot.policies import make_policy
policy = make_policy(cfg=None, pretrained_path="lerobot/act_aloha")
run_policy_server(
policy=policy,
port=50051,
num_inference_threads=4,
)
# On robot machine
from lerobot.async_inference.robot_client import RobotPolicyClient
from lerobot.robots import make_robot_from_config
robot = make_robot_from_config(robot_config)
client = RobotPolicyClient(
server_address="server.example.com:50051",
)
robot.connect()
client.connect()
try:
for step in range(1000):
obs = robot.get_observation()
action = client.get_action(obs)
robot.send_action(action)
finally:
client.disconnect()
robot.disconnect()
Distributed Training Data Collection
# On robot machines
from lerobot.async_inference.robot_client import RobotDataCollector
collector = RobotDataCollector(
server_address="training-server:50051",
robot_config=robot_config,
)
collector.run(
num_episodes=100,
episode_length=200,
)
# On training server
from lerobot.async_inference.data_aggregator import DataAggregator
aggregator = DataAggregator(
port=50051,
dataset_repo_id="myuser/collected_data",
)
aggregator.start()
# Receives data from multiple robots
Custom gRPC Service
import grpc
from concurrent import futures
from lerobot.transport.utils import (
grpc_channel_options,
send_bytes_in_chunks,
receive_bytes_in_chunks,
)
from lerobot.transport import services_pb2, services_pb2_grpc
class MyServicer(services_pb2_grpc.MyServiceServicer):
def SendData(self, request_iterator, context):
data = receive_bytes_in_chunks(
request_iterator,
queue=None,
shutdown_event=shutdown_event,
)
# Process data
return services_pb2.Response(success=True)
# Start server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=grpc_channel_options(),
)
services_pb2_grpc.add_MyServiceServicer_to_server(
MyServicer(),
server,
)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Best Practices
- Message Size: Keep messages under 4MB for best performance
- Retries: Enable retries for unreliable networks
- Timeouts: Set appropriate timeouts for operations
- Compression: Use chunking for large payloads
- Error Handling: Handle network errors gracefully
Security Considerations
- Encryption: Use TLS for production deployments
- Authentication: Implement authentication for policy servers
- Validation: Validate all received data
- Network: Use firewalls and VPNs for remote connections
See Also