Overview
Environments in rLLM define the interaction loop between agents and tasks. They handle observations, actions, and reward computation. This guide covers creating both single-turn and multi-turn environments.
Environment Types
rLLM provides two base classes:
- SingleTurnEnvironment: For one-shot question-answering tasks
- MultiTurnEnvironment: For multi-step interactive tasks
Both inherit from BaseEnv which defines the standard Gym interface.
BaseEnv Interface
All environments must implement:
from rllm.environments.base.base_env import BaseEnv
from typing import Any
class MyEnvironment(BaseEnv):
def reset(self) -> tuple[dict, dict]:
"""Reset environment to initial state.
Returns:
tuple: (observation, info)
"""
pass
def step(self, action: Any) -> tuple[Any, float, bool, dict]:
"""Execute one step.
Args:
action: Agent's action
Returns:
tuple: (observation, reward, done, info)
"""
pass
@staticmethod
def from_dict(info: dict) -> "MyEnvironment":
"""Create environment from dictionary.
Args:
info: Configuration dictionary
Returns:
MyEnvironment instance
"""
pass
See rllm/environments/base/base_env.py:5.
Single-Turn Environments
Overview
Use SingleTurnEnvironment for tasks where the agent responds once to a prompt and receives a reward.
Implementation
Subclass SingleTurnEnvironment
from rllm.environments.base.single_turn_env import SingleTurnEnvironment
from rllm.rewards.reward_fn import RewardFunction
class MyTaskEnvironment(SingleTurnEnvironment):
def __init__(self, task: dict = None, reward_fn: RewardFunction = None, **kwargs):
super().__init__(task=task, max_turns=1, **kwargs)
self.reward_fn = reward_fn or self.default_reward_fn
Implement get_reward_and_next_obs
This method evaluates the action and returns a reward:def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
"""Compute reward based on task and action.
Args:
task: Task dictionary with question, ground_truth, etc.
action: Agent's response
Returns:
tuple: (reward, next_observation)
"""
reward_output = self.reward_fn(task_info=task, action=action)
return reward_output.reward, {}
From rllm/environments/base/single_turn_env.py:27 Implement from_dict
Factory method for deserialization:@staticmethod
def from_dict(env_args: dict) -> "MyTaskEnvironment":
reward_fn = env_args.pop("reward_fn", None)
task = env_args.get("task", env_args)
return MyTaskEnvironment(task=task, reward_fn=reward_fn)
From rllm/environments/base/single_turn_env.py:42
Complete Example: Math Environment
from rllm.environments.base.single_turn_env import SingleTurnEnvironment
from rllm.rewards.reward_fn import math_reward_fn, RewardFunction
class MathEnvironment(SingleTurnEnvironment):
"""Environment for mathematical problem solving."""
def __init__(self, task: dict = None, reward_fn: RewardFunction = None, **kwargs):
super().__init__(task=task, reward_fn=reward_fn or math_reward_fn, **kwargs)
def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
# Use the reward function to evaluate correctness
reward_output = self.reward_fn(task_info=task, action=action)
return reward_output.reward, {}
@staticmethod
def from_dict(env_args: dict) -> "MathEnvironment":
reward_fn = env_args.pop("reward_fn", None)
task = env_args.get("task", env_args)
return MathEnvironment(task=task, reward_fn=reward_fn)
Usage:
from rllm.trainer.agent_trainer import AgentTrainer
from rllm.agents.math_agent import MathAgent
from rllm.data.dataset import DatasetRegistry
train_dataset = DatasetRegistry.load_dataset("gsm8k", "train")
env_args = {"reward_fn": math_reward_fn}
trainer = AgentTrainer(
agent_class=MathAgent,
env_class=MathEnvironment,
env_args=env_args,
config=config,
train_dataset=train_dataset
)
trainer.train()
From examples/gsm8k_lora/train_gsm8k_with_lora.py:10.
Multi-Turn Environments
Overview
Use MultiTurnEnvironment for interactive tasks that require multiple exchanges between agent and environment.
Implementation
Subclass MultiTurnEnvironment
from rllm.environments.base.multi_turn_env import MultiTurnEnvironment
class MyInteractiveEnvironment(MultiTurnEnvironment):
def __init__(self, task: dict = None, max_turns: int = 10, **kwargs):
super().__init__(task=task, max_turns=max_turns, **kwargs)
# Add custom state
self.current_state = None
Implement get_reward_and_next_obs
Return both reward and next observation:def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]:
# Update internal state based on action
self.current_state = self.transition(self.current_state, action)
# Compute reward
reward = self.compute_reward(task, action, self.current_state)
# Generate next observation
next_obs = {"state": self.current_state, "message": "Next step..."}
return reward, next_obs
Override reset if needed
Initialize environment state:def reset(self, task: dict = None):
super().reset(task)
self.current_state = self.get_initial_state(task)
return self.task, {}
Complete Example: FrozenLake Environment
Here’s a complete multi-turn environment with custom state management:
import gymnasium as gym
import numpy as np
from rllm.environments.base.base_env import BaseEnv
class FrozenLakeEnv(BaseEnv):
"""Grid navigation environment."""
GRID_LOOKUP = {
0: " P \t", # player
1: " _ \t", # frozen
2: " O \t", # hole
3: " G \t", # goal
}
INVALID_ACTION = 0
def __init__(self, size=8, p=0.8, seed=42, is_slippery=False, **kwargs):
self.size = size
self.p = p
self.seed = seed
self.is_slippery = is_slippery
# Generate random map
self.desc, self.goal_position = self.generate_random_map(
size=size, p=p, seed=seed
)
# Initialize Gym environment
self.gym_env = gym.make(
'FrozenLake-v1',
desc=self.desc,
is_slippery=is_slippery
)
self.action_map = {1: 0, 2: 1, 3: 2, 4: 3} # Custom to Gym mapping
def reset(self, task=None):
task = task or {}
self.gym_env.reset(seed=self.seed)
return self.render(), {}
def step(self, action: int):
if action == self.INVALID_ACTION:
return self.render(), 0, False, {"action_is_effective": False}
prev_position = self.gym_env.unwrapped.s
# Map custom action to Gym action
gym_action = self.action_map.get(action, 0)
obs, reward, done, truncated, info = self.gym_env.step(gym_action)
new_position = self.gym_env.unwrapped.s
info["action_is_effective"] = (prev_position != new_position)
return self.render(), reward, done, info
def render(self):
# Custom rendering logic
state = self.gym_env.unwrapped.s
row, col = state // self.size, state % self.size
grid = np.array(self.desc)
grid_visual = "\n".join(
"".join(self.GRID_LOOKUP[cell] for cell in row)
for row in grid
)
return grid_visual
@staticmethod
def from_dict(env_info: dict) -> "FrozenLakeEnv":
return FrozenLakeEnv(
size=env_info["size"],
seed=env_info["seed"],
p=env_info["p"],
is_slippery=env_info.get("is_slippery", False)
)
From rllm/environments/frozenlake/frozenlake.py:94.
Key Concepts
Observation Space
Observations can be:
- Dictionaries:
{"question": "...", "context": "..."}
- Strings: Text descriptions
- Images: For vision-language models (see VLM guide)
- Structured data: Custom objects serializable to dict
Reward Computation
Rewards should:
- Return float values (typically 0.0 or 1.0 for sparse rewards)
- Use
RewardFunction protocol for consistency
- Support delayed rewards in multi-turn settings
State Management
For stateful environments:
- Initialize state in
__init__ or reset()
- Update state in
step() or get_reward_and_next_obs()
- Store minimal state needed for the task
Using Environments with Workflows
For more complex scenarios, you can bypass environments entirely and use workflows:
from rllm.workflows.workflow import Workflow
from rllm.agents.agent import Episode, Trajectory
class CustomWorkflow(Workflow):
async def run(self, task: dict, uid: str, **kwargs) -> Episode:
# Implement custom interaction logic
# No environment needed!
pass
See Multi-Agent Workflows for advanced patterns.
Workflows provide more flexibility than environments for complex multi-agent scenarios, custom interaction patterns, or when you need fine-grained control over the training loop.
Best Practices
- Keep environments simple: Delegate complex logic to agents or reward functions
- Make environments stateless when possible: Easier to parallelize
- Implement from_dict correctly: Required for distributed training
- Set is_multithread_safe appropriately: Return
False if environment uses shared state
- Handle edge cases: Invalid actions, empty observations, etc.
The from_dict method must be able to reconstruct your environment from a dictionary. Avoid relying on global state or external resources that won’t be available during deserialization.
Next Steps