Real-Time Chunking (RTC) is a technique developed by Physical Intelligence that significantly improves the real-time performance of action chunking policies. It treats chunk generation as an inpainting problem, using prefix attention to strategically blend overlapping timesteps between action chunks.
Overview
Action chunking policies (like ACT, Diffusion Policy, etc.) predict sequences of future actions. However, during deployment, consecutive chunks often overlap, leading to inconsistencies. RTC solves this by:
- Velocity-based guidance: Using the previous chunk’s unexecuted actions as a “prefix” to guide the current prediction
- Adaptive weighting: Applying time-varying weights to smoothly blend old and new predictions
- Autograd-based correction: Computing guidance corrections via automatic differentiation
Configuration
RTC is configured via the RTCConfig class:
from lerobot.policies.rtc.configuration_rtc import RTCConfig, RTCAttentionSchedule
rtc_config = RTCConfig(
enabled=True,
prefix_attention_schedule=RTCAttentionSchedule.LINEAR,
max_guidance_weight=10.0,
execution_horizon=10,
debug=False
)
Configuration Parameters
Enable or disable RTC guidance
prefix_attention_schedule
RTCAttentionSchedule
default:"LINEAR"
Schedule for prefix attention weights. Options:
LINEAR: Linear decay from 1.0 to 0.0
EXPONENTIAL: Exponential decay (steeper)
Maximum guidance weight to clamp corrections. Higher values = stronger guidance from previous chunk.
Number of timesteps from the prefix to use for guidance. This controls how many future actions from the previous chunk influence the current prediction.
Enable debug tracking to record RTC internal states
Maximum number of debug steps to track
How RTC Works
1. Prefix Weight Computation
RTC computes time-varying weights for blending previous and current chunks:
def get_prefix_weights(
inference_delay: int,
execution_horizon: int,
chunk_size: int,
schedule: RTCAttentionSchedule = RTCAttentionSchedule.LINEAR
) -> torch.Tensor:
"""Compute prefix attention weights.
Args:
inference_delay: Number of timesteps already executed from prefix
execution_horizon: Total horizon for guidance
chunk_size: Size of current action chunk
schedule: Weight decay schedule
Returns:
Weights tensor of shape (chunk_size,)
"""
weights = torch.zeros(chunk_size)
# Only apply weights to execution_horizon timesteps
for t in range(min(execution_horizon, chunk_size)):
if schedule == RTCAttentionSchedule.LINEAR:
# Linear decay: w(t) = 1 - t / H
weights[t] = 1.0 - (inference_delay + t) / execution_horizon
elif schedule == RTCAttentionSchedule.EXPONENTIAL:
# Exponential decay: w(t) = exp(-t / H)
weights[t] = torch.exp(-torch.tensor((inference_delay + t) / execution_horizon))
return weights.clamp(0.0, 1.0)
2. Velocity Guidance
RTC guides the denoising process using the velocity from the previous chunk:
def denoise_step(
self,
x_t: Tensor, # Current noisy actions (B, T, A)
prev_chunk_left_over: Tensor, # Unexecuted actions from previous chunk (B, T_prev, A)
inference_delay: int, # How many actions already executed
time: float, # Normalized time in [0, 1]
original_denoise_step_partial: Callable, # Original denoising function
execution_horizon: int = None
) -> Tensor:
"""RTC guidance wrapper around denoising step."""
# Get base velocity from original denoiser
v_t = original_denoise_step_partial(x_t)
if prev_chunk_left_over is None:
return v_t # First chunk, no guidance
# Compute prefix weights
weights = self.get_prefix_weights(inference_delay, execution_horizon, chunk_size)
weights = weights.view(1, -1, 1) # (1, T, 1) for broadcasting
# Compute predicted trajectory: x1 = x_t + time * v_t
x1_t = x_t + time * v_t
# Compute error: difference between prefix and prediction
error = prev_chunk_left_over - x1_t
# Weight the error
weighted_error = error * weights
# Compute guidance correction via autograd
correction = weighted_error / time # Convert back to velocity
# Clamp guidance weight
guidance_weight = torch.norm(correction) / (torch.norm(v_t) + self.eps)
guidance_weight = min(guidance_weight, self.max_guidance_weight)
# Apply guidance
v_guided = v_t + guidance_weight * correction
return v_guided
3. Integration with Diffusion Policies
For diffusion-based policies, RTC modifies each denoising step:
class DiffusionPolicyWithRTC:
def __init__(self, config):
self.rtc = RTCProcessor(config.rtc_config)
self.prev_chunk_left_over = None
def predict_action_chunk(self, observation, **kwargs):
inference_delay = kwargs.get('inference_delay', 0)
execution_horizon = kwargs.get('execution_horizon', self.config.rtc_config.execution_horizon)
# Initialize noise
x_t = torch.randn(batch_size, chunk_size, action_dim)
# Denoising loop
for t in reversed(range(self.num_diffusion_steps)):
time = t / self.num_diffusion_steps
# Wrap original denoiser with RTC guidance
def original_denoise(x):
return self.unet(x, timestep=t, condition=observation)
# Apply RTC-guided denoising
v_t = self.rtc.denoise_step(
x_t=x_t,
prev_chunk_left_over=self.prev_chunk_left_over,
inference_delay=inference_delay,
time=time,
original_denoise_step_partial=original_denoise,
execution_horizon=execution_horizon
)
# Update x_t
x_t = x_t + v_t * dt
# Store unexecuted portion for next iteration
self.prev_chunk_left_over = x_t[:, inference_delay:]
return x_t
Using RTC in Practice
Enable RTC for a Policy
Add RTC configuration to your policy config:
from lerobot.policies.diffusion import DiffusionConfig
from lerobot.policies.rtc.configuration_rtc import RTCConfig, RTCAttentionSchedule
policy_config = DiffusionConfig(
# ... other config ...
rtc_config=RTCConfig(
enabled=True,
prefix_attention_schedule=RTCAttentionSchedule.LINEAR,
max_guidance_weight=10.0,
execution_horizon=10
)
)
Using with Async Inference
RTC works seamlessly with the async inference system:
# Robot client automatically passes inference_delay
action_chunk = policy.predict_action_chunk(
observation,
inference_delay=actions_already_executed,
prev_chunk_left_over=previous_unexecuted_actions,
execution_horizon=10
)
See lerobot/async_inference/policy_server.py:324 for the full implementation.
Attention Schedules
RTC supports different weight decay schedules:
Linear Schedule
Weights decay linearly from 1.0 to 0.0 over the execution horizon:
w(t) = max(0, 1 - (inference_delay + t) / execution_horizon)
The linear schedule produces weights that decay from 1.0 to 0.0, giving equal
importance to all timesteps in the execution horizon.
Exponential Schedule
Weights decay exponentially, giving more weight to earlier timesteps:
w(t) = exp(-(inference_delay + t) / execution_horizon)
The exponential schedule produces weights that decay more steeply, giving
stronger weight to earlier timesteps for more aggressive guidance.
Debugging RTC
Enable debug mode to track RTC internal states:
rtc_config = RTCConfig(
enabled=True,
debug=True,
debug_maxlen=100
)
rtc = RTCProcessor(rtc_config)
# Run inference...
# Get debug information
debug_steps = rtc.get_all_debug_steps()
for step in debug_steps:
print(f"Time: {step['time']}")
print(f"Guidance weight: {step['guidance_weight']}")
print(f"Error norm: {step['err'].norm()}")
print(f"Correction norm: {step['correction'].norm()}")
See lerobot/policies/rtc/debug_tracker.py for the full Tracker implementation.
Execution Horizon
Larger execution horizons provide more guidance but increase computation:
- Small (5-10): Faster, less smooth blending
- Medium (10-20): Good balance for most applications
- Large (20-50): Smoother blending, slower inference
Max Guidance Weight
Controls how strongly the previous chunk influences the current prediction:
- Low (1.0-5.0): Subtle guidance, more exploration
- Medium (5.0-15.0): Balanced guidance (recommended)
- High (15.0+): Strong guidance, may over-constrain
Common Issues
RTC Not Activating
Ensure prev_chunk_left_over is provided:
# First inference - no guidance
action_chunk = policy.predict_action_chunk(obs)
# Subsequent inferences - with guidance
action_chunk = policy.predict_action_chunk(
obs,
prev_chunk_left_over=previous_chunk[executed:]
)
Unstable Actions
Reduce max_guidance_weight if actions become unstable:
rtc_config.max_guidance_weight = 5.0 # Lower value = less aggressive guidance
High Latency
Reduce execution_horizon to speed up inference:
rtc_config.execution_horizon = 5 # Smaller horizon = faster
API Reference
RTCProcessor
See lerobot/policies/rtc/modeling_rtc.py:37
denoise_step
(x_t, prev_chunk_left_over, inference_delay, time, original_denoise_step_partial, execution_horizon) -> Tensor
Apply RTC guidance to a denoising step
get_prefix_weights
(inference_delay, execution_horizon, chunk_size) -> Tensor
Compute prefix attention weights based on schedule
Track debug information for visualization
RTCConfig
See lerobot/policies/rtc/configuration_rtc.py:30
prefix_attention_schedule
Weight decay schedule (LINEAR or EXPONENTIAL)
Maximum guidance correction weight
Number of timesteps to use for guidance