The Multiprocessing backend runs environments in parallel across multiple CPU processes, providing near-linear speedup for CPU-bound environments.
Overview
PufferLib’s Multiprocessing backend uses:
- Shared memory: Zero-copy data transfer between processes
- Process pooling: One process per physical CPU core
- Batched execution: Configurable batch sizes for flexible scheduling
- Async reception: Non-blocking environment execution
From pufferlib/vector.py:226-488
Basic usage
Simple setup
import pufferlib.vector
def make_env():
# Your environment creation
return env
vecenv = pufferlib.vector.make(
make_env,
num_envs=128,
num_workers=16,
backend='Multiprocessing'
)
# Use like any vectorized environment
vecenv.async_reset(seed=42)
obs, rewards, terminals, truncations, infos, env_ids, masks = vecenv.recv()
for step in range(1000):
actions = policy(obs)
vecenv.send(actions)
obs, rewards, terminals, truncations, infos, env_ids, masks = vecenv.recv()
Configuration parameters
vecenv = pufferlib.vector.make(
make_env,
num_envs=128, # Total environments
num_workers=16, # CPU processes
backend='Multiprocessing'
)
Architecture
Process structure
Each worker process runs a Serial vectorization of environments:
num_envs = 128
num_workers = 16
envs_per_worker = 128 // 16 # = 8
# Worker 0: Runs Serial(make_env, num_envs=8)
# Worker 1: Runs Serial(make_env, num_envs=8)
# ...
# Worker 15: Runs Serial(make_env, num_envs=8)
From pufferlib/vector.py:261-265
Shared memory buffers
Data is shared via RawArray for zero-copy transfer:
from multiprocessing import RawArray
self.shm = dict(
observations=RawArray(obs_ctype, num_agents * int(np.prod(obs_shape))),
actions=RawArray(atn_ctype, num_agents * int(np.prod(atn_shape))),
rewards=RawArray('f', num_agents),
terminals=RawArray('b', num_agents),
truncateds=RawArray('b', num_agents),
masks=RawArray('b', num_agents),
semaphores=RawArray('c', num_workers),
notify=RawArray('b', num_workers),
)
From pufferlib/vector.py:300-309
Worker process loop
From pufferlib/vector.py:172-224:
def _worker_process(
env_creators, env_args, env_kwargs,
obs_shape, obs_dtype, atn_shape, atn_dtype,
num_envs, num_agents, num_workers, worker_idx,
send_pipe, recv_pipe, shm, is_native, seed
):
# Create environments with shared memory buffers
envs = Serial(env_creators, env_args, env_kwargs, num_envs, buf=buf, seed=seed*num_envs)
semaphores = np.ndarray(num_workers, dtype=np.uint8, buffer=shm['semaphores'])
while True:
sem = semaphores[worker_idx]
if sem >= MAIN:
# Wait for work
continue
if sem == RESET:
seed = recv_pipe.recv()
_, infos = envs.reset(seed=seed)
elif sem == STEP:
_, _, _, _, infos = envs.step(atn_arr)
elif sem == CLOSE:
envs.close()
break
# Signal completion
semaphores[worker_idx] = MAIN
Batching strategies
Single worker batches
Fastest path - receive from one worker at a time:
vecenv = pufferlib.vector.make(
make_env,
num_envs=128,
num_workers=16,
batch_size=8, # envs_per_worker = 128/16 = 8
backend='Multiprocessing'
)
From pufferlib/vector.py:380-386:
if self.workers_per_batch == 1:
# Zero-copy optimized for batch size 1
w_slice = self.ready_workers[0]
s_range = [w_slice]
self.waiting_workers.append(w_slice)
self.ready_workers.pop(0)
Full synchronization
Wait for all workers - maximum batch size:
vecenv = pufferlib.vector.make(
make_env,
num_envs=128,
num_workers=16,
batch_size=128, # All environments
backend='Multiprocessing'
)
From pufferlib/vector.py:387-396:
elif self.workers_per_batch == self.num_workers:
# Zero-copy synchronized for all workers
if len(self.ready_workers) < self.num_workers:
continue
w_slice = slice(0, self.num_workers)
s_range = range(0, self.num_workers)
self.waiting_workers.extend(s_range)
self.ready_workers = []
Zero-copy batching
Wait for contiguous worker blocks:
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=64, # 4 workers at a time
zero_copy=True, # Must be contiguous
backend='Multiprocessing'
)
With zero_copy=True, num_envs must be divisible by batch_size.
From pufferlib/vector.py:397-416:
elif self.zero_copy:
# Zero-copy for batch size > 1
completed = np.zeros(self.num_workers, dtype=bool)
completed[self.ready_workers] = True
buffers = completed.reshape(-1, self.workers_per_batch).all(axis=1)
start = buffers.argmax()
if not buffers[start]:
continue
start *= self.workers_per_batch
end = start + self.workers_per_batch
w_slice = slice(start, end)
s_range = range(start, end)
self.waiting_workers.extend(s_range)
self.ready_workers = [e for e in self.ready_workers if e not in s_range]
Async batching
Take first N ready workers (copies data):
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=64,
zero_copy=False, # Allow non-contiguous
backend='Multiprocessing'
)
From pufferlib/vector.py:417-425:
elif len(self.ready_workers) >= self.workers_per_batch:
# Full async path - always copies
w_slice = self.ready_workers[:self.workers_per_batch]
s_range = w_slice
self.waiting_workers.extend(s_range)
self.ready_workers = self.ready_workers[self.workers_per_batch:]
Serial vs Multiprocessing
From pufferlib/vector.py:52-170 (Serial) vs pufferlib/vector.py:226-488 (Multiprocessing)
# Serial: Single process
vecenv_serial = pufferlib.vector.make(
make_env,
num_envs=128,
backend='Serial'
)
# Typical SPS: 5,000 (single core)
# Multiprocessing: 16 workers
vecenv_mp = pufferlib.vector.make(
make_env,
num_envs=128,
num_workers=16,
backend='Multiprocessing'
)
# Typical SPS: 75,000 (15x speedup)
Multiprocessing provides near-linear speedup for CPU-bound environments. Expect 14-15x speedup on 16 cores.
Batch size impact
# Small batches: Lower latency, higher variance
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=16, # Receive frequently
backend='Multiprocessing'
)
# Large batches: Higher throughput, more synchronization
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=256, # Wait for all
backend='Multiprocessing'
)
Advanced features
Trajectory synchronization
Force workers to complete in order:
vecenv = pufferlib.vector.make(
make_env,
num_envs=128,
num_workers=16,
sync_traj=True, # Wait for workers in order
backend='Multiprocessing'
)
From pufferlib/vector.py:358-372:
if self.sync_traj:
worker = self.waiting_workers[0]
sem = self.buf['semaphores'][worker]
if sem >= MAIN:
self.waiting_workers.pop(0)
self.ready_workers.append(worker)
else:
worker = self.waiting_workers.pop(0)
sem = self.buf['semaphores'][worker]
if sem >= MAIN:
self.ready_workers.append(worker)
else:
self.waiting_workers.append(worker)
Core limits
Prevents oversubscription by default:
import psutil
cpu_cores = psutil.cpu_count(logical=False)
if num_workers > cpu_cores and not overwork:
raise pufferlib.APIUsageError(
f'num_workers ({num_workers}) > hardware cores ({cpu_cores}) is disallowed by default. '
'PufferLib multiprocessing is heavily optimized for 1 process per hardware core. '
'If you really want to do this, set overwork=True.'
)
From pufferlib/vector.py:246-253
Use only physical cores (not hyperthreads). Oversubscription usually decreases performance.
Custom environment arguments
Pass different arguments to each environment:
def make_env_0():
return MyEnv(level=0)
def make_env_1():
return MyEnv(level=1)
# List of creators
vecenv = pufferlib.vector.make(
[make_env_0, make_env_1, ...], # One per env
num_envs=128,
backend='Multiprocessing'
)
# Or with args/kwargs
vecenv = pufferlib.vector.make(
make_env,
env_args=[(0,), (1,), ...], # Args per env
env_kwargs=[{'level': 0}, {'level': 1}, ...], # Kwargs per env
num_envs=128,
backend='Multiprocessing'
)
From pufferlib/vector.py:672-684
Debugging
Use Serial for debugging
Serial backend runs in the main process for easier debugging:
vecenv = pufferlib.vector.make(
make_env,
num_envs=8,
backend='Serial' # Easy to debug with breakpoints
)
Profiling workers
Worker processes can be profiled individually:
def _worker_process_profiled(...):
import cProfile
import pstats
profiler = cProfile.Profile()
profiler.enable()
# Normal worker loop
_worker_process(...)
profiler.disable()
stats = pstats.Stats(profiler)
stats.dump_stats(f'worker_{worker_idx}.prof')
Memory monitoring
Check per-worker memory usage:
import psutil
import os
def _worker_process(...):
process = psutil.Process(os.getpid())
while True:
# ... worker loop ...
mem_mb = process.memory_info().rss / 1024 / 1024
if mem_mb > 1000: # 1GB threshold
print(f"Worker {worker_idx} using {mem_mb:.1f}MB")
Best practices
Match workers to physical cores
import psutil
num_workers = psutil.cpu_count(logical=False)
vecenv = pufferlib.vector.make(
make_env,
num_envs=num_workers * 8, # 8 envs per worker
num_workers=num_workers,
backend='Multiprocessing'
)
Choose appropriate batch size
# For training: match training batch size
training_batch = 4096
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=256, # Collect full batch
backend='Multiprocessing'
)
Enable zero-copy when possible
vecenv = pufferlib.vector.make(
make_env,
num_envs=256,
num_workers=16,
batch_size=64,
zero_copy=True, # Requires 256 % 64 == 0
backend='Multiprocessing'
)
Profile before scaling
import pufferlib.vector
pufferlib.vector.autotune(
make_env,
batch_size=4096,
max_envs=256,
time_per_test=5
)
Example: Optimized setup
import pufferlib.vector
import psutil
def make_env():
import gym
return gym.make('CartPole-v1')
# Get physical core count
num_cores = psutil.cpu_count(logical=False)
print(f"Physical cores: {num_cores}")
# Configure vectorization
envs_per_worker = 8
num_envs = num_cores * envs_per_worker
batch_size = num_envs // 2 # Process half at a time
vecenv = pufferlib.vector.make(
make_env,
num_envs=num_envs,
num_workers=num_cores,
batch_size=batch_size,
zero_copy=True,
sync_traj=False, # Async for max throughput
backend='Multiprocessing'
)
print(f"\nConfiguration:")
print(f" Environments: {num_envs}")
print(f" Workers: {num_cores}")
print(f" Envs per worker: {envs_per_worker}")
print(f" Batch size: {batch_size}")
print(f" Total agents: {vecenv.num_agents}")
# Training loop
vecenv.async_reset(seed=42)
for epoch in range(100):
obs, *_ = vecenv.recv()
# Your policy
actions = vecenv.action_space.sample()
vecenv.send(actions)
vecenv.close()
Start with autotune to find optimal parameters, then fine-tune based on your specific workload.