Skip to main content
The Multiprocessing backend runs environments in parallel across multiple CPU processes, providing near-linear speedup for CPU-bound environments.

Overview

PufferLib’s Multiprocessing backend uses:
  • Shared memory: Zero-copy data transfer between processes
  • Process pooling: One process per physical CPU core
  • Batched execution: Configurable batch sizes for flexible scheduling
  • Async reception: Non-blocking environment execution
From pufferlib/vector.py:226-488

Basic usage

Simple setup

import pufferlib.vector

def make_env():
    # Your environment creation
    return env

vecenv = pufferlib.vector.make(
    make_env,
    num_envs=128,
    num_workers=16,
    backend='Multiprocessing'
)

# Use like any vectorized environment
vecenv.async_reset(seed=42)
obs, rewards, terminals, truncations, infos, env_ids, masks = vecenv.recv()

for step in range(1000):
    actions = policy(obs)
    vecenv.send(actions)
    obs, rewards, terminals, truncations, infos, env_ids, masks = vecenv.recv()

Configuration parameters

vecenv = pufferlib.vector.make(
    make_env,
    num_envs=128,      # Total environments
    num_workers=16,    # CPU processes
    backend='Multiprocessing'
)

Architecture

Process structure

Each worker process runs a Serial vectorization of environments:
num_envs = 128
num_workers = 16
envs_per_worker = 128 // 16  # = 8

# Worker 0: Runs Serial(make_env, num_envs=8)
# Worker 1: Runs Serial(make_env, num_envs=8)
# ...
# Worker 15: Runs Serial(make_env, num_envs=8)
From pufferlib/vector.py:261-265

Shared memory buffers

Data is shared via RawArray for zero-copy transfer:
from multiprocessing import RawArray

self.shm = dict(
    observations=RawArray(obs_ctype, num_agents * int(np.prod(obs_shape))),
    actions=RawArray(atn_ctype, num_agents * int(np.prod(atn_shape))),
    rewards=RawArray('f', num_agents),
    terminals=RawArray('b', num_agents),
    truncateds=RawArray('b', num_agents),
    masks=RawArray('b', num_agents),
    semaphores=RawArray('c', num_workers),
    notify=RawArray('b', num_workers),
)
From pufferlib/vector.py:300-309

Worker process loop

From pufferlib/vector.py:172-224:
def _worker_process(
    env_creators, env_args, env_kwargs, 
    obs_shape, obs_dtype, atn_shape, atn_dtype,
    num_envs, num_agents, num_workers, worker_idx, 
    send_pipe, recv_pipe, shm, is_native, seed
):
    # Create environments with shared memory buffers
    envs = Serial(env_creators, env_args, env_kwargs, num_envs, buf=buf, seed=seed*num_envs)
    
    semaphores = np.ndarray(num_workers, dtype=np.uint8, buffer=shm['semaphores'])
    
    while True:
        sem = semaphores[worker_idx]
        if sem >= MAIN:
            # Wait for work
            continue
        
        if sem == RESET:
            seed = recv_pipe.recv()
            _, infos = envs.reset(seed=seed)
        elif sem == STEP:
            _, _, _, _, infos = envs.step(atn_arr)
        elif sem == CLOSE:
            envs.close()
            break
        
        # Signal completion
        semaphores[worker_idx] = MAIN

Batching strategies

Single worker batches

Fastest path - receive from one worker at a time:
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=128,
    num_workers=16,
    batch_size=8,  # envs_per_worker = 128/16 = 8
    backend='Multiprocessing'
)
From pufferlib/vector.py:380-386:
if self.workers_per_batch == 1:
    # Zero-copy optimized for batch size 1
    w_slice = self.ready_workers[0]
    s_range = [w_slice]
    self.waiting_workers.append(w_slice)
    self.ready_workers.pop(0)

Full synchronization

Wait for all workers - maximum batch size:
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=128,
    num_workers=16,
    batch_size=128,  # All environments
    backend='Multiprocessing'
)
From pufferlib/vector.py:387-396:
elif self.workers_per_batch == self.num_workers:
    # Zero-copy synchronized for all workers
    if len(self.ready_workers) < self.num_workers:
        continue
    
    w_slice = slice(0, self.num_workers)
    s_range = range(0, self.num_workers)
    self.waiting_workers.extend(s_range)
    self.ready_workers = []

Zero-copy batching

Wait for contiguous worker blocks:
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=64,     # 4 workers at a time
    zero_copy=True,    # Must be contiguous
    backend='Multiprocessing'
)
With zero_copy=True, num_envs must be divisible by batch_size.
From pufferlib/vector.py:397-416:
elif self.zero_copy:
    # Zero-copy for batch size > 1
    completed = np.zeros(self.num_workers, dtype=bool)
    completed[self.ready_workers] = True
    buffers = completed.reshape(-1, self.workers_per_batch).all(axis=1)
    start = buffers.argmax()
    if not buffers[start]:
        continue
    
    start *= self.workers_per_batch
    end = start + self.workers_per_batch
    w_slice = slice(start, end)
    s_range = range(start, end)
    self.waiting_workers.extend(s_range)
    self.ready_workers = [e for e in self.ready_workers if e not in s_range]

Async batching

Take first N ready workers (copies data):
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=64,
    zero_copy=False,  # Allow non-contiguous
    backend='Multiprocessing'
)
From pufferlib/vector.py:417-425:
elif len(self.ready_workers) >= self.workers_per_batch:
    # Full async path - always copies
    w_slice = self.ready_workers[:self.workers_per_batch]
    s_range = w_slice
    self.waiting_workers.extend(s_range)
    self.ready_workers = self.ready_workers[self.workers_per_batch:]

Performance comparison

Serial vs Multiprocessing

From pufferlib/vector.py:52-170 (Serial) vs pufferlib/vector.py:226-488 (Multiprocessing)
# Serial: Single process
vecenv_serial = pufferlib.vector.make(
    make_env,
    num_envs=128,
    backend='Serial'
)
# Typical SPS: 5,000 (single core)

# Multiprocessing: 16 workers
vecenv_mp = pufferlib.vector.make(
    make_env,
    num_envs=128,
    num_workers=16,
    backend='Multiprocessing'
)
# Typical SPS: 75,000 (15x speedup)
Multiprocessing provides near-linear speedup for CPU-bound environments. Expect 14-15x speedup on 16 cores.

Batch size impact

# Small batches: Lower latency, higher variance
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=16,  # Receive frequently
    backend='Multiprocessing'
)

# Large batches: Higher throughput, more synchronization
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=256,  # Wait for all
    backend='Multiprocessing'
)

Advanced features

Trajectory synchronization

Force workers to complete in order:
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=128,
    num_workers=16,
    sync_traj=True,  # Wait for workers in order
    backend='Multiprocessing'
)
From pufferlib/vector.py:358-372:
if self.sync_traj:
    worker = self.waiting_workers[0]
    sem = self.buf['semaphores'][worker]
    if sem >= MAIN:
        self.waiting_workers.pop(0)
        self.ready_workers.append(worker)
else:
    worker = self.waiting_workers.pop(0)
    sem = self.buf['semaphores'][worker]
    if sem >= MAIN:
        self.ready_workers.append(worker)
    else:
        self.waiting_workers.append(worker)

Core limits

Prevents oversubscription by default:
import psutil

cpu_cores = psutil.cpu_count(logical=False)
if num_workers > cpu_cores and not overwork:
    raise pufferlib.APIUsageError(
        f'num_workers ({num_workers}) > hardware cores ({cpu_cores}) is disallowed by default. '
        'PufferLib multiprocessing is heavily optimized for 1 process per hardware core. '
        'If you really want to do this, set overwork=True.'
    )
From pufferlib/vector.py:246-253
Use only physical cores (not hyperthreads). Oversubscription usually decreases performance.

Custom environment arguments

Pass different arguments to each environment:
def make_env_0():
    return MyEnv(level=0)

def make_env_1():
    return MyEnv(level=1)

# List of creators
vecenv = pufferlib.vector.make(
    [make_env_0, make_env_1, ...],  # One per env
    num_envs=128,
    backend='Multiprocessing'
)

# Or with args/kwargs
vecenv = pufferlib.vector.make(
    make_env,
    env_args=[(0,), (1,), ...],           # Args per env
    env_kwargs=[{'level': 0}, {'level': 1}, ...],  # Kwargs per env
    num_envs=128,
    backend='Multiprocessing'
)
From pufferlib/vector.py:672-684

Debugging

Use Serial for debugging

Serial backend runs in the main process for easier debugging:
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=8,
    backend='Serial'  # Easy to debug with breakpoints
)

Profiling workers

Worker processes can be profiled individually:
def _worker_process_profiled(...):
    import cProfile
    import pstats
    
    profiler = cProfile.Profile()
    profiler.enable()
    
    # Normal worker loop
    _worker_process(...)
    
    profiler.disable()
    stats = pstats.Stats(profiler)
    stats.dump_stats(f'worker_{worker_idx}.prof')

Memory monitoring

Check per-worker memory usage:
import psutil
import os

def _worker_process(...):
    process = psutil.Process(os.getpid())
    
    while True:
        # ... worker loop ...
        
        mem_mb = process.memory_info().rss / 1024 / 1024
        if mem_mb > 1000:  # 1GB threshold
            print(f"Worker {worker_idx} using {mem_mb:.1f}MB")

Best practices

1

Match workers to physical cores

import psutil

num_workers = psutil.cpu_count(logical=False)
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=num_workers * 8,  # 8 envs per worker
    num_workers=num_workers,
    backend='Multiprocessing'
)
2

Choose appropriate batch size

# For training: match training batch size
training_batch = 4096
vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=256,  # Collect full batch
    backend='Multiprocessing'
)
3

Enable zero-copy when possible

vecenv = pufferlib.vector.make(
    make_env,
    num_envs=256,
    num_workers=16,
    batch_size=64,
    zero_copy=True,  # Requires 256 % 64 == 0
    backend='Multiprocessing'
)
4

Profile before scaling

import pufferlib.vector

pufferlib.vector.autotune(
    make_env,
    batch_size=4096,
    max_envs=256,
    time_per_test=5
)

Example: Optimized setup

import pufferlib.vector
import psutil

def make_env():
    import gym
    return gym.make('CartPole-v1')

# Get physical core count
num_cores = psutil.cpu_count(logical=False)
print(f"Physical cores: {num_cores}")

# Configure vectorization
envs_per_worker = 8
num_envs = num_cores * envs_per_worker
batch_size = num_envs // 2  # Process half at a time

vecenv = pufferlib.vector.make(
    make_env,
    num_envs=num_envs,
    num_workers=num_cores,
    batch_size=batch_size,
    zero_copy=True,
    sync_traj=False,  # Async for max throughput
    backend='Multiprocessing'
)

print(f"\nConfiguration:")
print(f"  Environments: {num_envs}")
print(f"  Workers: {num_cores}")
print(f"  Envs per worker: {envs_per_worker}")
print(f"  Batch size: {batch_size}")
print(f"  Total agents: {vecenv.num_agents}")

# Training loop
vecenv.async_reset(seed=42)

for epoch in range(100):
    obs, *_ = vecenv.recv()
    
    # Your policy
    actions = vecenv.action_space.sample()
    
    vecenv.send(actions)

vecenv.close()
Start with autotune to find optimal parameters, then fine-tune based on your specific workload.

Build docs developers (and LLMs) love