Skip to main content

Overview

Environments in rLLM define the interaction loop between agents and tasks. They handle observations, actions, and reward computation. This guide covers creating both single-turn and multi-turn environments.

Environment Types

rLLM provides two base classes:
  • SingleTurnEnvironment: For one-shot question-answering tasks
  • MultiTurnEnvironment: For multi-step interactive tasks
Both inherit from BaseEnv which defines the standard Gym interface.

BaseEnv Interface

All environments must implement:
from rllm.environments.base.base_env import BaseEnv
from typing import Any

class MyEnvironment(BaseEnv):
    def reset(self) -> tuple[dict, dict]:
        """Reset environment to initial state.
        
        Returns:
            tuple: (observation, info)
        """
        pass
    
    def step(self, action: Any) -> tuple[Any, float, bool, dict]:
        """Execute one step.
        
        Args:
            action: Agent's action
            
        Returns:
            tuple: (observation, reward, done, info)
        """
        pass
    
    @staticmethod
    def from_dict(info: dict) -> "MyEnvironment":
        """Create environment from dictionary.
        
        Args:
            info: Configuration dictionary
            
        Returns:
            MyEnvironment instance
        """
        pass
See rllm/environments/base/base_env.py:5.

Single-Turn Environments

Overview

Use SingleTurnEnvironment for tasks where the agent responds once to a prompt and receives a reward.

Implementation

1

Subclass SingleTurnEnvironment

from rllm.environments.base.single_turn_env import SingleTurnEnvironment
from rllm.rewards.reward_fn import RewardFunction

class MyTaskEnvironment(SingleTurnEnvironment):
    def __init__(self, task: dict = None, reward_fn: RewardFunction = None, **kwargs):
        super().__init__(task=task, max_turns=1, **kwargs)
        self.reward_fn = reward_fn or self.default_reward_fn
2

Implement get_reward_and_next_obs

This method evaluates the action and returns a reward:
def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
    """Compute reward based on task and action.
    
    Args:
        task: Task dictionary with question, ground_truth, etc.
        action: Agent's response
        
    Returns:
        tuple: (reward, next_observation)
    """
    reward_output = self.reward_fn(task_info=task, action=action)
    return reward_output.reward, {}
From rllm/environments/base/single_turn_env.py:27
3

Implement from_dict

Factory method for deserialization:
@staticmethod
def from_dict(env_args: dict) -> "MyTaskEnvironment":
    reward_fn = env_args.pop("reward_fn", None)
    task = env_args.get("task", env_args)
    return MyTaskEnvironment(task=task, reward_fn=reward_fn)
From rllm/environments/base/single_turn_env.py:42

Complete Example: Math Environment

from rllm.environments.base.single_turn_env import SingleTurnEnvironment
from rllm.rewards.reward_fn import math_reward_fn, RewardFunction

class MathEnvironment(SingleTurnEnvironment):
    """Environment for mathematical problem solving."""
    
    def __init__(self, task: dict = None, reward_fn: RewardFunction = None, **kwargs):
        super().__init__(task=task, reward_fn=reward_fn or math_reward_fn, **kwargs)
    
    def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
        # Use the reward function to evaluate correctness
        reward_output = self.reward_fn(task_info=task, action=action)
        return reward_output.reward, {}
    
    @staticmethod
    def from_dict(env_args: dict) -> "MathEnvironment":
        reward_fn = env_args.pop("reward_fn", None)
        task = env_args.get("task", env_args)
        return MathEnvironment(task=task, reward_fn=reward_fn)
Usage:
from rllm.trainer.agent_trainer import AgentTrainer
from rllm.agents.math_agent import MathAgent
from rllm.data.dataset import DatasetRegistry

train_dataset = DatasetRegistry.load_dataset("gsm8k", "train")

env_args = {"reward_fn": math_reward_fn}

trainer = AgentTrainer(
    agent_class=MathAgent,
    env_class=MathEnvironment,
    env_args=env_args,
    config=config,
    train_dataset=train_dataset
)
trainer.train()
From examples/gsm8k_lora/train_gsm8k_with_lora.py:10.

Multi-Turn Environments

Overview

Use MultiTurnEnvironment for interactive tasks that require multiple exchanges between agent and environment.

Implementation

1

Subclass MultiTurnEnvironment

from rllm.environments.base.multi_turn_env import MultiTurnEnvironment

class MyInteractiveEnvironment(MultiTurnEnvironment):
    def __init__(self, task: dict = None, max_turns: int = 10, **kwargs):
        super().__init__(task=task, max_turns=max_turns, **kwargs)
        # Add custom state
        self.current_state = None
2

Implement get_reward_and_next_obs

Return both reward and next observation:
def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
    # Update internal state based on action
    self.current_state = self.transition(self.current_state, action)
    
    # Compute reward
    reward = self.compute_reward(task, action, self.current_state)
    
    # Generate next observation
    next_obs = {"state": self.current_state, "message": "Next step..."}
    
    return reward, next_obs
3

Override reset if needed

Initialize environment state:
def reset(self, task: dict = None):
    super().reset(task)
    self.current_state = self.get_initial_state(task)
    return self.task, {}

Complete Example: FrozenLake Environment

Here’s a complete multi-turn environment with custom state management:
import gymnasium as gym
import numpy as np
from rllm.environments.base.base_env import BaseEnv

class FrozenLakeEnv(BaseEnv):
    """Grid navigation environment."""
    
    GRID_LOOKUP = {
        0: " P \t",  # player
        1: " _ \t",  # frozen
        2: " O \t",  # hole
        3: " G \t",  # goal
    }
    
    INVALID_ACTION = 0
    
    def __init__(self, size=8, p=0.8, seed=42, is_slippery=False, **kwargs):
        self.size = size
        self.p = p
        self.seed = seed
        self.is_slippery = is_slippery
        
        # Generate random map
        self.desc, self.goal_position = self.generate_random_map(
            size=size, p=p, seed=seed
        )
        
        # Initialize Gym environment
        self.gym_env = gym.make(
            'FrozenLake-v1',
            desc=self.desc,
            is_slippery=is_slippery
        )
        
        self.action_map = {1: 0, 2: 1, 3: 2, 4: 3}  # Custom to Gym mapping
    
    def reset(self, task=None):
        task = task or {}
        self.gym_env.reset(seed=self.seed)
        return self.render(), {}
    
    def step(self, action: int):
        if action == self.INVALID_ACTION:
            return self.render(), 0, False, {"action_is_effective": False}
        
        prev_position = self.gym_env.unwrapped.s
        
        # Map custom action to Gym action
        gym_action = self.action_map.get(action, 0)
        obs, reward, done, truncated, info = self.gym_env.step(gym_action)
        
        new_position = self.gym_env.unwrapped.s
        info["action_is_effective"] = (prev_position != new_position)
        
        return self.render(), reward, done, info
    
    def render(self):
        # Custom rendering logic
        state = self.gym_env.unwrapped.s
        row, col = state // self.size, state % self.size
        
        grid = np.array(self.desc)
        grid_visual = "\n".join(
            "".join(self.GRID_LOOKUP[cell] for cell in row)
            for row in grid
        )
        return grid_visual
    
    @staticmethod
    def from_dict(env_info: dict) -> "FrozenLakeEnv":
        return FrozenLakeEnv(
            size=env_info["size"],
            seed=env_info["seed"],
            p=env_info["p"],
            is_slippery=env_info.get("is_slippery", False)
        )
From rllm/environments/frozenlake/frozenlake.py:94.

Key Concepts

Observation Space

Observations can be:
  • Dictionaries: {"question": "...", "context": "..."}
  • Strings: Text descriptions
  • Images: For vision-language models (see VLM guide)
  • Structured data: Custom objects serializable to dict

Reward Computation

Rewards should:
  • Return float values (typically 0.0 or 1.0 for sparse rewards)
  • Use RewardFunction protocol for consistency
  • Support delayed rewards in multi-turn settings

State Management

For stateful environments:
  • Initialize state in __init__ or reset()
  • Update state in step() or get_reward_and_next_obs()
  • Store minimal state needed for the task

Using Environments with Workflows

For more complex scenarios, you can bypass environments entirely and use workflows:
from rllm.workflows.workflow import Workflow
from rllm.agents.agent import Episode, Trajectory

class CustomWorkflow(Workflow):
    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        # Implement custom interaction logic
        # No environment needed!
        pass
See Multi-Agent Workflows for advanced patterns.
Workflows provide more flexibility than environments for complex multi-agent scenarios, custom interaction patterns, or when you need fine-grained control over the training loop.

Best Practices

  1. Keep environments simple: Delegate complex logic to agents or reward functions
  2. Make environments stateless when possible: Easier to parallelize
  3. Implement from_dict correctly: Required for distributed training
  4. Set is_multithread_safe appropriately: Return False if environment uses shared state
  5. Handle edge cases: Invalid actions, empty observations, etc.
The from_dict method must be able to reconstruct your environment from a dictionary. Avoid relying on global state or external resources that won’t be available during deserialization.

Next Steps

Build docs developers (and LLMs) love