Building blocks for agentic RL - implementing custom agents and environments in rLLM
Agents and environments are the fundamental building blocks in rLLM. This guide explains their interfaces, responsibilities, and how to implement your own custom agents and environments.
Agents are the core components that generate intelligent actions based on environmental observations. They serve as the bridge between language models and interactive environments.
Here’s a complete implementation of a math agent with self-correction:
import copyfrom rllm.agents.agent import BaseAgent, Step, Trajectory, Actionclass MathAgent(BaseAgent): """ A math agent that solves problems step-by-step with self-correction capability. """ def __init__(self, accumulate_thinking=True): self.instruction = "Let's think step by step and put your final answer within \\boxed{}." self._trajectory = Trajectory() self.messages = [] self.accumulate_thinking = accumulate_thinking def update_from_env(self, observation: Any, reward: float, done: bool, info: dict, **kwargs): """Process environment feedback and update internal state.""" # Format observation based on whether it's the initial problem or feedback if not self.trajectory.steps: # Initial problem presentation question = observation["question"] formatted_observation = f"{question} {self.instruction}" else: # Follow-up correction prompt formatted_observation = ( "Your previous answer may contain a mistake. " "Please review it carefully and answer again." ) # Update the last step's outcome if there are previous steps if self.trajectory.steps: prior_step = self.trajectory.steps[-1] prior_step.reward = reward prior_step.done = done prior_step.info = info if done: return # Add user message and create new step self.messages.append({"role": "user", "content": formatted_observation}) new_step = Step(observation=formatted_observation) self.trajectory.steps.append(new_step) def update_from_model(self, response: str, **kwargs) -> Action: """Process model response and update trajectory.""" assert self.trajectory.steps, "Trajectory should not be empty" # Update current step with model response cur_step = self.get_current_state() cur_step.model_response = response cur_step.chat_completions = copy.deepcopy(self.messages) + [ {"role": "assistant", "content": response} ] # Add assistant message self.messages.append({"role": "assistant", "content": response}) return Action(action=response) def reset(self): """Reset agent state for new episode.""" self._trajectory = Trajectory() self.messages = [] @property def chat_completions(self) -> list[dict[str, str]]: """Return conversation history for model interaction.""" messages = copy.deepcopy(self.messages) # Optionally strip thinking tags from history if not self.accumulate_thinking: for msg in messages[:-1]: if msg["role"] == "assistant": _, sep, after = msg["content"].partition("</think>") if sep: msg["content"] = after return messages @property def trajectory(self) -> Trajectory: """Return complete interaction trajectory.""" return self._trajectory
The chat_completions property is crucial - it’s what the execution engine uses to construct prompts for the language model. It should return a list of messages in OpenAI chat format.
Environments provide tasks, evaluate agent actions, and manage episode lifecycles. They complement agents by defining the context within which agents operate and learn.
Here’s a complete environment for math problems with self-correction:
from rllm.environments.base import MultiTurnEnvironmentfrom rllm.rewards.reward_fn import math_reward_fnclass MathEnv(MultiTurnEnvironment): """ Environment for mathematical problem solving with self-correction. """ def __init__(self, task: dict | None = None, max_attempts: int = 2): super().__init__(task=task, max_turns=max_attempts) self.is_correct = False def get_reward_and_next_obs(self, task: dict, action: Any) -> tuple[float, dict]: """Evaluate answer and provide reward.""" # Use rLLM's math reward function reward_output = math_reward_fn(task_info=task, action=action) reward = reward_output.reward self.is_correct = reward > 0.0 # No additional observation needed (agent handles formatting) return reward, {} @staticmethod def from_dict(env_args: dict) -> "MathEnv": """Factory method for creating environment from config.""" return MathEnv( task=env_args.get("task", env_args), max_attempts=env_args.get("max_attempts", 2) )
The from_dict() static method is required for both inference and training. The execution engine uses it to instantiate environments from task dictionaries.
State Management: Always maintain clean state in reset(). The same agent/environment instance may be reused across multiple episodes.
Chat Completions: The chat_completions property should return a fresh list of messages for each call. It’s what the execution engine uses to construct prompts.
from_dict: Implement from_dict() carefully - it’s used both during inference and training to instantiate environments from task dictionaries.
Thread Safety: If your environment uses external resources (files, network), ensure is_multithread_safe() returns False or implement proper locking.