Skip to main content
LeRobot’s async inference system enables you to run policy inference on a remote server (typically a GPU machine) while controlling your robot from a separate client (typically running on the robot’s embedded computer). This architecture minimizes latency and enables real-time robot control.

Architecture

The async inference system consists of two components:
  • Policy Server: Runs the neural network policy on a GPU machine
  • Robot Client: Controls the robot and communicates with the policy server
They communicate via gRPC, streaming observations from the robot to the server and receiving action chunks in return.

Starting the Policy Server

The policy server loads a pretrained model and waits for client connections:
python -m lerobot.async_inference.policy_server \
    --host=127.0.0.1 \
    --port=8080 \
    --fps=30 \
    --inference_latency=0.033 \
    --obs_queue_timeout=1

Server Configuration

host
str
default:"localhost"
Server host address to bind to
port
int
default:"8080"
Server port number
fps
int
default:"30"
Target frames per second for observation processing
inference_latency
float
default:"0.033"
Target inference latency in seconds (controls inference throttling)
obs_queue_timeout
float
default:"1.0"
Timeout for observation queue in seconds

Running the Robot Client

The robot client captures observations, sends them to the server, and executes received actions:
python src/lerobot/async_inference/robot_client.py \
    --robot.type=so100_follower \
    --robot.port=/dev/tty.usbmodem58760431541 \
    --robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}" \
    --task="pick up the cup" \
    --server_address=127.0.0.1:8080 \
    --policy_type=act \
    --pretrained_name_or_path=lerobot/act_so100_pick_cup \
    --policy_device=cuda \
    --client_device=cpu \
    --actions_per_chunk=50 \
    --chunk_size_threshold=0.5 \
    --aggregate_fn_name=weighted_average

Client Configuration

robot
RobotConfig
required
Robot configuration including type, port, and camera setup
task
str
Task instruction for the robot (e.g., “pick up the cup”)
server_address
str
default:"localhost:8080"
Address of the policy server
policy_type
str
required
Type of policy to use (e.g., “act”, “pi0”, “diffusion”)
pretrained_name_or_path
str
required
HuggingFace Hub model ID or local path to pretrained model
policy_device
str
default:"cpu"
Device for policy inference on the server (e.g., “cuda”, “cuda:0”, “mps”)
client_device
str
default:"cpu"
Device to move actions to after receiving from server
actions_per_chunk
int
required
Number of actions per chunk to request from the policy
chunk_size_threshold
float
default:"0.5"
Threshold for triggering new observations (0.0-1.0). When the action queue size drops below chunk_size_threshold * actions_per_chunk, a new observation is sent to the server.
aggregate_fn_name
str
default:"weighted_average"
Function to aggregate overlapping actions. Options:
  • weighted_average: 0.3 * old + 0.7 * new
  • latest_only: Always use new actions
  • average: 0.5 * old + 0.5 * new
  • conservative: 0.7 * old + 0.3 * new

Control Flow

The async inference system operates in two parallel threads on the client:

Thread 1: Control Loop

Runs at the robot’s control frequency (e.g., 30 Hz):
  1. Execute Action: If actions are available in the queue, pop and execute the next action
  2. Stream Observation: When queue size drops below threshold, capture and send observation to server
while client.running:
    # Execute action if available
    if client.actions_available():
        client.control_loop_action()
    
    # Send observation if queue is low
    if client._ready_to_send_observation():
        client.control_loop_observation(task)

Thread 2: Action Receiver

Continuously receives action chunks from the server:
while client.running:
    # Get action chunk from server
    actions_chunk = stub.GetActions(Empty())
    
    # Aggregate with existing queue
    client._aggregate_action_queues(actions_chunk, aggregate_fn)

Action Queue Management

The client maintains an action queue and aggregates overlapping actions:
def _aggregate_action_queues(
    self,
    incoming_actions: list[TimedAction],
    aggregate_fn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor]
):
    """Aggregate overlapping timestep actions using aggregate_fn"""
    for new_action in incoming_actions:
        if new_action.timestep in current_queue:
            # Aggregate with existing action
            aggregated = aggregate_fn(
                current_queue[new_action.timestep],
                new_action.action
            )
            queue.put(TimedAction(new_action.timestamp, new_action.timestep, aggregated))
        else:
            # Add new action directly
            queue.put(new_action)

Must-Go Observations

The system uses a “must-go” flag to ensure observations are processed when the action queue is empty:
observation.must_go = self.must_go.is_set() and self.action_queue.empty()
This prevents the robot from stalling when no actions are available.

Server-Side Processing

The policy server maintains an observation queue (size 1) and processes observations on demand:
class PolicyServer:
    def SendObservations(self, timed_observation):
        # Enqueue if must_go or passes sanity checks
        if observation.must_go or self._obs_sanity_checks(observation):
            self.observation_queue.put(observation)
    
    def GetActions(self):
        # Wait for observation
        obs = self.observation_queue.get(timeout=obs_queue_timeout)
        
        # Run inference pipeline
        observation = self.preprocessor(obs.observation)
        action_chunk = self.policy.predict_action_chunk(observation)
        action_chunk = self.postprocessor(action_chunk)
        
        # Return timed actions
        return self._time_action_chunk(obs.timestamp, action_chunk, obs.timestep)

Performance Optimization

Reduce Observation Sending

Adjust chunk_size_threshold to control how often observations are sent:
# Send observations more frequently (more responsive but higher latency)
--chunk_size_threshold=0.7

# Send observations less frequently (lower latency but less responsive)
--chunk_size_threshold=0.3

Custom Aggregation Functions

Define custom aggregation logic for your robot:
from lerobot.async_inference.configs import AGGREGATE_FUNCTIONS

# Add custom aggregation function
AGGREGATE_FUNCTIONS["my_custom"] = lambda old, new: 0.8 * old + 0.2 * new

Device Placement

Move actions to GPU for downstream planners:
# Keep actions on GPU after receiving from server
--client_device=cuda

Debugging

Enable debug visualization to monitor action queue size:
python src/lerobot/async_inference/robot_client.py \
    # ... other args ...
    --debug_visualize_queue_size=True
This generates a plot showing action queue size over time after the session ends.

Example: SO-100 Robot

Complete example for running async inference on an SO-100 robot:
# Terminal 1: Start policy server on GPU machine
python -m lerobot.async_inference.policy_server \
    --host=0.0.0.0 \
    --port=8080 \
    --fps=30

# Terminal 2: Run robot client
python src/lerobot/async_inference/robot_client.py \
    --robot.type=so100_follower \
    --robot.port=/dev/ttyUSB0 \
    --robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \
    --task="pick up the red block" \
    --server_address=192.168.1.100:8080 \
    --policy_type=act \
    --pretrained_name_or_path=lerobot/act_so100_real \
    --policy_device=cuda \
    --actions_per_chunk=100 \
    --chunk_size_threshold=0.5 \
    --aggregate_fn_name=weighted_average

API Reference

RobotClient

See lerobot/async_inference/robot_client.py:83
start
() -> bool
Connect to the policy server and initialize the client
stop
() -> None
Stop the client and disconnect from the server
control_loop
(task: str) -> tuple[Observation, Action]
Main control loop that executes actions and streams observations

PolicyServer

See lerobot/async_inference/policy_server.py:66
SendPolicyInstructions
(RemotePolicyConfig) -> Empty
Load the policy model based on client instructions
SendObservations
(TimedObservation) -> Empty
Receive observation from the robot client
GetActions
() -> Actions
Generate and return action chunk based on latest observation

Build docs developers (and LLMs) love