Skip to main content

Overview

TaskAgent (defined in task_agent.py) is the per-task solver agent. Given a dictionary of domain-specific inputs, it constructs a natural-language instruction, calls the LLM via chat_with_agent, and extracts the response field from a JSON reply. The harness runs many TaskAgent instances concurrently via ThreadPoolExecutor, one per dataset row. TaskAgent extends AgentSystem and adds no new constructor parameters.

Source

task_agent.py
from agent.base_agent import AgentSystem
from agent.llm_withtools import chat_with_agent
from utils.common import extract_jsons

class TaskAgent(AgentSystem):
    def forward(self, inputs):
        """
        An agent that solves a given task.

        Args:
            inputs (dict): A dictionary with input data for the task.

        Returns:
            tuple:
                - prediction (str): The prediction made by the agent.
                - new_msg_history (list): A list of messages representing the
                  message history of the interaction.
        """
        domain = inputs['domain']
        instruction = f"""You are an agent.

Task input:

Respond in JSON format with the following schema:
<json>
{{
    "response": ...
}}
</json>"""
        new_msg_history = chat_with_agent(
            instruction, model=self.model, msg_history=[], logging=self.log
        )

        # Extract the response
        prediction = "None"
        try:
            extracted_jsons = extract_jsons(new_msg_history[-1]['text'])
            if extracted_jsons is not None and "response" in extracted_jsons[-1]:
                prediction = extracted_jsons[-1]['response']
        except Exception as e:
            self.log(f"Error extracting prediction: {e}")
            prediction = "None"

        return prediction, new_msg_history

Constructor

TaskAgent inherits the constructor from AgentSystem unchanged.
TaskAgent(model=OPENAI_MODEL, chat_history_file='./outputs/chat_history.md')
The harness instantiates one TaskAgent per question, using a per-question chat history path:
agent = TaskAgent(
    model=model,
    chat_history_file=f"./outputs/agent_evals/chat_history_{question_id}.md",
)
See AgentSystem for full parameter details.

forward

TaskAgent.forward(inputs) -> tuple[str, list]
Formats the inputs dict into an instruction, calls the LLM without tools (the default tools_available=[]), then parses the last assistant message for a JSON object containing a "response" key.

Parameters

inputs
dict
required
A dictionary of task inputs. Must contain at minimum:Example — paper_review domain:
inputs = {
    "domain": "paper_review",
    "paper_id": "2401.00001",
    "title": "Attention Is All You Need",
    "abstract": "We propose a new simple network architecture ...",
    "review_criteria": "novelty, clarity, soundness",
}
The exact keys beyond domain are determined by the domain’s format_input_dict function (e.g. domains/paper_review/utils.py).

Return Value

Returns a 2-tuple.
prediction
str
The value of response extracted from the last LLM message’s JSON block. Returns the string "None" (not Python None) in two cases:
  • The LLM’s final message contains no parseable JSON.
  • The parsed JSON does not contain a "response" key.
  • Any exception is raised during extraction.
new_msg_history
list
The full conversation history as a list of message dicts, each with "role" ("user" or "assistant") and "text" keys. The last element is always the final assistant response. Useful for debugging or post-hoc analysis of the agent’s reasoning.

JSON Response Schema

The agent is instructed to reply using the following schema, wrapped in <json> tags so the parser can find it:
<json>
{
    "response": ...
}
</json>
extract_jsons scans new_msg_history[-1]['text'] for all <json>...</json> blocks and returns the last one whose value can be parsed. The response field value is returned verbatim — it can be a string, number, list, or nested object depending on the domain.
TaskAgent.forward calls chat_with_agent with the default tools_available=[], meaning no tools are loaded and the agent must reason entirely from the information in inputs. This is intentional: task agents are stateless solvers, not autonomous actors. Use MetaAgent when you need tool access.

How the Harness Loads TaskAgent

domains/harness.py dynamically loads TaskAgent from a file path or importable module path at runtime using importlib:
domains/harness.py
def load_task_agent(agent_path: str):
    """
    agent_path can be:
      - a python file path: ./task_agent.py or /abs/path/task_agent.py
      - a module path: proofgrader.task_agent or my_pkg.my_agent
    Returns: TaskAgent class
    """
    # Case 1: looks like a file path or exists on disk
    if agent_path.endswith(".py") or os.path.exists(agent_path):
        abs_path = os.path.abspath(agent_path)
        spec = importlib.util.spec_from_file_location("agent_module", abs_path)
        if spec is None or spec.loader is None:
            raise ImportError(f"Could not load spec from file: {abs_path}")
        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)
        if not hasattr(mod, "TaskAgent"):
            raise AttributeError(f"No TaskAgent found in file: {abs_path}")
        return mod.TaskAgent

    # Case 2: interpret as module path
    mod = importlib.import_module(agent_path)
    if not hasattr(mod, "TaskAgent"):
        raise AttributeError(f"No TaskAgent found in module: {agent_path}")
    return mod.TaskAgent
This means you can pass --agent_path ./task_agent.py (a file) or --agent_path mypackage.task_agent (a module), and the harness will find the TaskAgent class either way.
The loaded file must define a class named exactly TaskAgent. Any other name raises AttributeError. If you subclass TaskAgent in a separate file, your class must also be named TaskAgent (or you must re-export it under that name).

Parallel Execution

The harness runs one TaskAgent.forward call per dataset row, batched across a ThreadPoolExecutor:
domains/harness.py
with ThreadPoolExecutor(max_workers=num_workers) as executor:
    for i, row in dataset.iterrows():
        futures.append((
            i,
            executor.submit(
                run_agent,
                TaskAgent, model, row, evals_folder,
                format_input_dict, question_id_col,
            ),
        ))

    for idx, future in futures:
        prediction = future.result()
        predictions[idx] = prediction
Each worker calls run_agent, which constructs a fresh TaskAgent instance with a unique chat_history_file path (chat_history_{question_id}.md). Because ThreadLoggerManager keys loggers by (thread_id, log_file), concurrent agents write to separate files without any locking overhead beyond the initial logger creation. The default num_workers is 5. Override with --num_workers on the CLI.

Running via Harness CLI

python domains/harness.py \
  --agent_path ./task_agent.py \
  --domain paper_review \
  --output_dir ./outputs \
  --num_workers 5 \
  --num_samples 50

Build docs developers (and LLMs) love