Skip to main content
PufferLib’s emulation layer allows you to use any Gymnasium or PettingZoo environment with PufferLib’s vectorization and training systems. The emulation wrappers convert standard environment APIs to the PufferEnv interface while handling complex observation and action spaces.

Why emulation?

Native PufferEnvs require:
  • Box observation spaces
  • Simple action spaces (Discrete, MultiDiscrete, or Box)
  • Pre-allocated shared memory buffers
Many existing environments use:
  • Dict or Tuple observation spaces
  • Complex nested action spaces
  • Dynamic allocation patterns
Emulation bridges this gap by:
  1. Converting complex spaces to flat arrays
  2. Managing serialization and deserialization
  3. Providing a compatible interface for vectorization
Emulation has some overhead compared to native PufferEnvs, but it’s often negligible compared to environment step time. Use native PufferEnvs when you need maximum performance.

GymnasiumPufferEnv

GymnasiumPufferEnv wraps single-agent Gymnasium environments:
examples/gymnasium_env.py
import gymnasium
import pufferlib.emulation

class SampleGymnasiumEnv(gymnasium.Env):
    def __init__(self):
        self.observation_space = gymnasium.spaces.Box(low=-1, high=1, shape=(1,))
        self.action_space = gymnasium.spaces.Discrete(2)

    def reset(self):
        return self.observation_space.sample(), {}

    def step(self, action):
        return self.observation_space.sample(), 0.0, False, False, {}

# Wrap the Gymnasium environment
gymnasium_env = SampleGymnasiumEnv()
puffer_env = pufferlib.emulation.GymnasiumPufferEnv(gymnasium_env)

# Use like any PufferEnv
observation, info = puffer_env.reset()
action = puffer_env.action_space.sample()
observation, reward, terminal, truncation, info = puffer_env.step(action)

Constructor options

pufferlib/emulation.py
GymnasiumPufferEnv(
    env=None,              # Existing environment instance
    env_creator=None,      # Or callable that creates environment
    env_args=[],          # Positional args for env_creator
    env_kwargs={},        # Keyword args for env_creator
    buf=None,             # Shared buffer (for vectorization)
    seed=0                # Random seed
)
You can provide either an environment instance or a creator function:
env = gymnasium.make('CartPole-v1')
puffer_env = pufferlib.emulation.GymnasiumPufferEnv(env=env)

Space emulation

GymnasiumPufferEnv automatically converts observation and action spaces:

Box observations (no conversion)

# Input: Box(shape=(4,))
# Output: Box(shape=(4,)) - unchanged

Dict observations (flattened to Box)

# Input: Dict({
#   'position': Box(shape=(2,), dtype=float32),
#   'velocity': Box(shape=(2,), dtype=float32)
# })
# Output: Box(shape=(16,), dtype=uint8) - flattened bytes

Tuple observations (flattened to Box)

# Input: Tuple((
#   Box(shape=(4,), dtype=float32),
#   Discrete(3)
# ))
# Output: Box(shape=(20,), dtype=uint8) - flattened bytes
Complex observation spaces are flattened to byte arrays. The emulation layer handles packing and unpacking automatically.

Properties

pufferlib/emulation.py
puffer_env.num_agents           # Always 1 for Gymnasium
puffer_env.single_observation_space
puffer_env.single_action_space
puffer_env.observation_space    # Same as single_observation_space
puffer_env.action_space         # Same as single_action_space
puffer_env.emulated            # Dict with emulation metadata
puffer_env.done                # True if environment is done

PettingZooPufferEnv

PettingZooPufferEnv wraps multi-agent PettingZoo environments:
examples/pettingzoo_env.py
import gymnasium
import pettingzoo
import pufferlib.emulation

class SamplePettingzooEnv(pettingzoo.ParallelEnv):
    def __init__(self):
        self.possible_agents = ['agent_0', 'agent_1']
        self.agents = ['agent_0', 'agent_1']

    def observation_space(self, agent):
        return gymnasium.spaces.Box(low=-1, high=1, shape=(1,))

    def action_space(self, agent):
        return gymnasium.spaces.Discrete(2)

    def reset(self, seed=None, options=None):
        observations = {agent: self.observation_space(agent).sample() 
                       for agent in self.agents}
        return observations, {}

    def step(self, action):
        observations = {agent: self.observation_space(agent).sample() 
                       for agent in self.agents}
        rewards = {agent: 0.0 for agent in self.agents}
        terminals = {agent: False for agent in self.agents}
        truncations = {agent: False for agent in self.agents}
        infos = {agent: {} for agent in self.agents}
        return observations, rewards, terminals, truncations, infos

# Wrap the PettingZoo environment
env = SamplePettingzooEnv()
puffer_env = pufferlib.emulation.PettingZooPufferEnv(env)

# Use like any PufferEnv
observations, infos = puffer_env.reset()
actions = {agent: puffer_env.action_space(agent).sample() 
          for agent in puffer_env.agents}
observations, rewards, terminals, truncations, infos = puffer_env.step(actions)

Agent handling

PettingZooPufferEnv manages the mapping between agent names and array indices:
pufferlib/emulation.py
class PettingZooPufferEnv:
    def __init__(self, ...):
        # Compute spaces from first agent
        single_agent = self.possible_agents[0]
        self.env_single_observation_space = self.env.observation_space(single_agent)
        self.env_single_action_space = self.env.action_space(single_agent)
        
        # Number of agents
        self.num_agents = len(self.possible_agents)
        
    def reset(self, seed=None):
        obs, info = self.env.reset(seed=seed)
        
        # Map agent observations to array indices
        for i, agent in enumerate(self.possible_agents):
            if agent in obs:
                self.observations[i] = obs[agent]

Variable agent counts

PettingZoo environments can have agents that die or join during an episode. PufferLib handles this with masks:
pufferlib/emulation.py
def step(self, actions):
    obs, rewards, dones, truncateds, infos = self.env.step(unpacked_actions)
    
    for i, agent in enumerate(self.possible_agents):
        if agent not in obs:
            # Agent is dead/inactive
            self.observations[i] = 0
            self.rewards[i] = 0
            self.terminals[i] = True
            self.truncations[i] = False
            self.masks[i] = False  # Mark as inactive
            continue
            
        # Agent is active
        self.observations[i] = obs[agent]
        self.rewards[i] = rewards[agent]
        self.terminals[i] = dones[agent]
        self.truncations[i] = truncateds[agent]
        self.masks[i] = True  # Mark as active
The mask array indicates which agents are currently active:
# After stepping
active_agents = puffer_env.masks.sum()  # Count active agents
active_rewards = puffer_env.rewards[puffer_env.masks]  # Get rewards for active agents only

Properties

puffer_env.num_agents           # Number of possible agents
puffer_env.possible_agents      # List of all possible agent names
puffer_env.agents              # Currently active agents
puffer_env.done                # True if all agents are done
puffer_env.masks               # Boolean array of active agents

Space conversion details

The emulation layer uses structured dtypes to pack complex spaces:
pufferlib/emulation.py
def dtype_from_space(space):
    if isinstance(space, pufferlib.spaces.Tuple):
        dtype = []
        for i, elem in enumerate(space):
            dtype.append((f'f{i}', dtype_from_space(elem)))
    elif isinstance(space, pufferlib.spaces.Dict):
        dtype = []
        for k, value in space.items():
            dtype.append((k, dtype_from_space(value)))
    elif isinstance(space, pufferlib.spaces.Discrete):
        dtype = (np.int32, ())
    elif isinstance(space, pufferlib.spaces.MultiDiscrete):
        dtype = (np.int32, (len(space.nvec),))
    else:
        dtype = (space.dtype, space.shape)
    
    return np.dtype(dtype, align=True)
This creates a structured numpy dtype that can be viewed as a flat byte array:
# Complex observation space
space = Dict({
    'image': Box(shape=(84, 84, 3), dtype=uint8),
    'vector': Box(shape=(4,), dtype=float32)
})

# Becomes structured dtype
dtype = np.dtype([
    ('image', np.uint8, (84, 84, 3)),
    ('vector', np.float32, (4,))
], align=True)

# Can be viewed as flat array
flat_size = dtype.itemsize  # Total bytes

Vectorization with emulated environments

Emulated environments work seamlessly with vectorization:
import gymnasium
import pufferlib.emulation
import pufferlib.vector

# Create vectorized emulated environments
vecenv = pufferlib.vector.make(
    lambda: pufferlib.emulation.GymnasiumPufferEnv(
        env_creator=lambda: gymnasium.make('CartPole-v1')
    ),
    num_envs=8,
    backend=pufferlib.vector.Serial
)

observations, infos = vecenv.reset()
Or more concisely:
def make_env():
    env = gymnasium.make('CartPole-v1')
    return pufferlib.emulation.GymnasiumPufferEnv(env)

vecenv = pufferlib.vector.make(
    make_env,
    num_envs=8,
    backend=pufferlib.vector.Multiprocessing,
    num_workers=4
)

Performance considerations

Emulation adds overhead:
  1. Space conversion: Packing/unpacking complex spaces (small cost)
  2. Attribute access: Additional wrapper layer (negligible)
  3. Done checking: Extra logic for environment termination (negligible)
For most environments, emulation overhead is < 5% of total step time. The main cost is in the actual environment logic, not the wrapper.
If profiling shows emulation overhead is significant, consider implementing a native PufferEnv. For most use cases, emulation is perfectly fine.

Common patterns

Wrapping Gymnasium environments

import gymnasium
import pufferlib.emulation

# Simple wrapping
env = gymnasium.make('CartPole-v1')
puffer_env = pufferlib.emulation.GymnasiumPufferEnv(env)

Wrapping PettingZoo environments

from pettingzoo.butterfly import pistonball_v6
import pufferlib.emulation

env = pistonball_v6.parallel_env()
puffer_env = pufferlib.emulation.PettingZooPufferEnv(env)

Adding wrappers before emulation

import gymnasium
from gymnasium.wrappers import FrameStack
import pufferlib.emulation

# Apply Gymnasium wrappers first
env = gymnasium.make('ALE/Pong-v5')
env = FrameStack(env, num_stack=4)

# Then wrap with PufferLib
puffer_env = pufferlib.emulation.GymnasiumPufferEnv(env)

Custom environment with emulation

class MyComplexEnv(gymnasium.Env):
    def __init__(self):
        self.observation_space = gymnasium.spaces.Dict({
            'image': gymnasium.spaces.Box(0, 255, shape=(64, 64, 3), dtype=np.uint8),
            'state': gymnasium.spaces.Box(-1, 1, shape=(10,), dtype=np.float32)
        })
        self.action_space = gymnasium.spaces.MultiDiscrete([3, 2])
    
    # ... implement reset() and step()

# Emulation handles complex spaces automatically
env = MyComplexEnv()
puffer_env = pufferlib.emulation.GymnasiumPufferEnv(env)

Build docs developers (and LLMs) love