LeRobot’s async inference system enables you to run policy inference on a remote server (typically a GPU machine) while controlling your robot from a separate client (typically running on the robot’s embedded computer). This architecture minimizes latency and enables real-time robot control.
Architecture
The async inference system consists of two components:
- Policy Server: Runs the neural network policy on a GPU machine
- Robot Client: Controls the robot and communicates with the policy server
They communicate via gRPC, streaming observations from the robot to the server and receiving action chunks in return.
Starting the Policy Server
The policy server loads a pretrained model and waits for client connections:
python -m lerobot.async_inference.policy_server \
--host=127.0.0.1 \
--port=8080 \
--fps=30 \
--inference_latency=0.033 \
--obs_queue_timeout=1
Server Configuration
Server host address to bind to
Target frames per second for observation processing
Target inference latency in seconds (controls inference throttling)
Timeout for observation queue in seconds
Running the Robot Client
The robot client captures observations, sends them to the server, and executes received actions:
python src/lerobot/async_inference/robot_client.py \
--robot.type=so100_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}" \
--task="pick up the cup" \
--server_address=127.0.0.1:8080 \
--policy_type=act \
--pretrained_name_or_path=lerobot/act_so100_pick_cup \
--policy_device=cuda \
--client_device=cpu \
--actions_per_chunk=50 \
--chunk_size_threshold=0.5 \
--aggregate_fn_name=weighted_average
Client Configuration
Robot configuration including type, port, and camera setup
Task instruction for the robot (e.g., “pick up the cup”)
server_address
str
default:"localhost:8080"
Address of the policy server
Type of policy to use (e.g., “act”, “pi0”, “diffusion”)
HuggingFace Hub model ID or local path to pretrained model
Device for policy inference on the server (e.g., “cuda”, “cuda:0”, “mps”)
Device to move actions to after receiving from server
Number of actions per chunk to request from the policy
Threshold for triggering new observations (0.0-1.0). When the action queue size drops below chunk_size_threshold * actions_per_chunk, a new observation is sent to the server.
aggregate_fn_name
str
default:"weighted_average"
Function to aggregate overlapping actions. Options:
weighted_average: 0.3 * old + 0.7 * new
latest_only: Always use new actions
average: 0.5 * old + 0.5 * new
conservative: 0.7 * old + 0.3 * new
Control Flow
The async inference system operates in two parallel threads on the client:
Thread 1: Control Loop
Runs at the robot’s control frequency (e.g., 30 Hz):
- Execute Action: If actions are available in the queue, pop and execute the next action
- Stream Observation: When queue size drops below threshold, capture and send observation to server
while client.running:
# Execute action if available
if client.actions_available():
client.control_loop_action()
# Send observation if queue is low
if client._ready_to_send_observation():
client.control_loop_observation(task)
Thread 2: Action Receiver
Continuously receives action chunks from the server:
while client.running:
# Get action chunk from server
actions_chunk = stub.GetActions(Empty())
# Aggregate with existing queue
client._aggregate_action_queues(actions_chunk, aggregate_fn)
Action Queue Management
The client maintains an action queue and aggregates overlapping actions:
def _aggregate_action_queues(
self,
incoming_actions: list[TimedAction],
aggregate_fn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor]
):
"""Aggregate overlapping timestep actions using aggregate_fn"""
for new_action in incoming_actions:
if new_action.timestep in current_queue:
# Aggregate with existing action
aggregated = aggregate_fn(
current_queue[new_action.timestep],
new_action.action
)
queue.put(TimedAction(new_action.timestamp, new_action.timestep, aggregated))
else:
# Add new action directly
queue.put(new_action)
Must-Go Observations
The system uses a “must-go” flag to ensure observations are processed when the action queue is empty:
observation.must_go = self.must_go.is_set() and self.action_queue.empty()
This prevents the robot from stalling when no actions are available.
Server-Side Processing
The policy server maintains an observation queue (size 1) and processes observations on demand:
class PolicyServer:
def SendObservations(self, timed_observation):
# Enqueue if must_go or passes sanity checks
if observation.must_go or self._obs_sanity_checks(observation):
self.observation_queue.put(observation)
def GetActions(self):
# Wait for observation
obs = self.observation_queue.get(timeout=obs_queue_timeout)
# Run inference pipeline
observation = self.preprocessor(obs.observation)
action_chunk = self.policy.predict_action_chunk(observation)
action_chunk = self.postprocessor(action_chunk)
# Return timed actions
return self._time_action_chunk(obs.timestamp, action_chunk, obs.timestep)
Reduce Observation Sending
Adjust chunk_size_threshold to control how often observations are sent:
# Send observations more frequently (more responsive but higher latency)
--chunk_size_threshold=0.7
# Send observations less frequently (lower latency but less responsive)
--chunk_size_threshold=0.3
Custom Aggregation Functions
Define custom aggregation logic for your robot:
from lerobot.async_inference.configs import AGGREGATE_FUNCTIONS
# Add custom aggregation function
AGGREGATE_FUNCTIONS["my_custom"] = lambda old, new: 0.8 * old + 0.2 * new
Device Placement
Move actions to GPU for downstream planners:
# Keep actions on GPU after receiving from server
--client_device=cuda
Debugging
Enable debug visualization to monitor action queue size:
python src/lerobot/async_inference/robot_client.py \
# ... other args ...
--debug_visualize_queue_size=True
This generates a plot showing action queue size over time after the session ends.
Example: SO-100 Robot
Complete example for running async inference on an SO-100 robot:
# Terminal 1: Start policy server on GPU machine
python -m lerobot.async_inference.policy_server \
--host=0.0.0.0 \
--port=8080 \
--fps=30
# Terminal 2: Run robot client
python src/lerobot/async_inference/robot_client.py \
--robot.type=so100_follower \
--robot.port=/dev/ttyUSB0 \
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \
--task="pick up the red block" \
--server_address=192.168.1.100:8080 \
--policy_type=act \
--pretrained_name_or_path=lerobot/act_so100_real \
--policy_device=cuda \
--actions_per_chunk=100 \
--chunk_size_threshold=0.5 \
--aggregate_fn_name=weighted_average
API Reference
RobotClient
See lerobot/async_inference/robot_client.py:83
Connect to the policy server and initialize the client
Stop the client and disconnect from the server
control_loop
(task: str) -> tuple[Observation, Action]
Main control loop that executes actions and streams observations
PolicyServer
See lerobot/async_inference/policy_server.py:66
SendPolicyInstructions
(RemotePolicyConfig) -> Empty
Load the policy model based on client instructions
SendObservations
(TimedObservation) -> Empty
Receive observation from the robot client
Generate and return action chunk based on latest observation