Skip to main content
When using multi-turn workflows, messages typically arrive between agent turns. The workflow waits at a hook, receives a message, then starts a new turn. But sometimes you need to inject messages during an agent’s turn, before tool calls complete or while the model is reasoning. DurableAgent’s prepareStep callback enables this by running before each step in the agent loop, giving you a chance to inject queued messages into the conversation.

When to Use This

Message queueing is useful when:
  • Users send follow-up messages while the agent is still processing
  • External systems need to inject context mid-turn (e.g., a webhook fires during processing)
  • You want messages to influence the agent’s next step rather than waiting for the current turn to complete
If you just need basic multi-turn conversations where messages arrive between turns, see Chat Session Modeling. This guide covers the more advanced case of injecting messages during turns.

The prepareStep Callback

The prepareStep callback runs before each step in the agent loop. It receives the current state and can modify the messages sent to the model:
lineNumbers
interface PrepareStepInfo {
  model: string | (() => Promise<LanguageModelV2>);  // Current model
  stepNumber: number;                                // 0-indexed step count
  steps: StepResult[];                               // Previous step results
  messages: LanguageModelV2Prompt;                   // Messages to be sent
  experimental_context: unknown;                     // Custom context
}

interface PrepareStepResult {
  model?: string | (() => Promise<LanguageModelV2>); // Override model
  messages?: LanguageModelV2Prompt;                  // Override messages
  system?: string;                                    // Override system prompt
  toolChoice?: ToolChoice;                            // Override tool choice
  activeTools?: string[];                             // Override active tools
  experimental_context?: unknown;                     // Update context
}

Injecting Queued Messages

Combine a message queue with prepareStep to inject messages that arrive during processing:
workflows/chat/index.ts
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable, getWorkflowMetadata } from "workflow";
import { chatMessageHook } from "./hooks/chat-message";
import { flightBookingTools, FLIGHT_ASSISTANT_PROMPT } from "./steps/tools";
import type { UIMessageChunk, ModelMessage } from "ai";

export async function chat(initialMessages: ModelMessage[]) {
  "use workflow";

  const { workflowRunId: runId } = getWorkflowMetadata();
  const writable = getWritable<UIMessageChunk>();
  const messageQueue: Array<{ role: "user"; content: string }> = [];

  const agent = new DurableAgent({
    model: "anthropic/claude-opus",
    system: FLIGHT_ASSISTANT_PROMPT,
    tools: flightBookingTools,
  });

  // Listen for messages in background (non-blocking)
  const hook = chatMessageHook.create({ token: runId });
  hook.then(({ message }) => {
    messageQueue.push({ role: "user", content: message });
  });

  await agent.stream({
    messages: initialMessages,
    writable,
    prepareStep: ({ messages: currentMessages }) => {
      // Inject any queued messages before the next LLM call
      if (messageQueue.length > 0) {
        const newMessages = messageQueue.splice(0); // Drain queue
        return {
          messages: [
            ...currentMessages,
            ...newMessages.map((m) => ({
              role: m.role,
              content: [{ type: "text" as const, text: m.content }],
            })),
          ],
        };
      }
      return {};
    },
  });
}
Messages sent via chatMessageHook.resume() accumulate in the queue and get injected before the next step, whether that’s a tool call or another LLM request.
The prepareStep callback receives messages in LanguageModelV2Prompt format (with content arrays), which is the internal format used by the AI SDK.

Combining with Multi-Turn Sessions

You can combine message queueing with the standard multi-turn pattern:
workflows/chat/index.ts
import { DurableAgent } from "@workflow/ai/agent";
import { getWritable, getWorkflowMetadata } from "workflow";
import { chatMessageHook } from "./hooks/chat-message";
import type { UIMessageChunk, ModelMessage } from "ai";

export async function chat(initialMessages: ModelMessage[]) {
  "use workflow";

  const { workflowRunId: runId } = getWorkflowMetadata();
  const writable = getWritable<UIMessageChunk>();
  const messages: ModelMessage[] = [...initialMessages];
  const messageQueue: Array<{ role: "user"; content: string }> = [];

  const agent = new DurableAgent({
    model: "anthropic/claude-opus",
    system: "You are a helpful assistant.",
    tools: {},
  });

  const hook = chatMessageHook.create({ token: runId });

  while (true) {
    // Set up non-blocking listener for mid-turn messages
    let pendingMessage: string | null = null;
    hook.then(({ message }) => {
      if (message === "/done") return;
      messageQueue.push({ role: "user", content: message });
      pendingMessage = message;
    });

    const result = await agent.stream({
      messages,
      writable,
      preventClose: true,
      prepareStep: ({ messages: currentMessages }) => {
        // Inject queued messages during turn
        if (messageQueue.length > 0) {
          const newMessages = messageQueue.splice(0);
          return {
            messages: [
              ...currentMessages,
              ...newMessages.map((m) => ({
                role: m.role,
                content: [{ type: "text" as const, text: m.content }],
              })),
            ],
          };
        }
        return {};
      },
    });

    messages.push(...result.messages.slice(messages.length));

    // Wait for next message (either queued during turn or new)
    const { message: followUp } = pendingMessage ? { message: pendingMessage } : await hook;
    if (followUp === "/done") break;

    messages.push({ role: "user", content: followUp });
  }
}

Advanced Use Cases

Context Switching

Switch the model or adjust parameters mid-conversation:
lineNumbers
import type { PrepareStepInfo, PrepareStepResult } from "@workflow/ai/agent";

const agent = new DurableAgent({
  model: "anthropic/claude-haiku",
  tools: {},
});

await agent.stream({
  messages,
  writable,
  prepareStep: ({ stepNumber, messages }: PrepareStepInfo): PrepareStepResult => {
    // Switch to a more powerful model for complex tasks
    const lastMessage = messages[messages.length - 1];
    const content = lastMessage?.content?.[0];
    
    if (content?.type === "text" && content.text.includes("analyze")) {
      return {
        model: "anthropic/claude-opus",
        temperature: 0.3,
      };
    }

    return {};
  },
});

Dynamic Tool Activation

Enable or disable tools based on context:
lineNumbers
await agent.stream({
  messages,
  writable,
  prepareStep: ({ experimental_context }) => {
    const userRole = experimental_context?.userRole;
    
    // Only admin users can access certain tools
    if (userRole === "admin") {
      return {
        activeTools: ["searchFlights", "bookFlight", "cancelFlight", "refundFlight"],
      };
    }
    
    return {
      activeTools: ["searchFlights", "bookFlight"],
    };
  },
});

Conversation Summarization

Summarize old messages to manage context window:
lineNumbers
await agent.stream({
  messages,
  writable,
  prepareStep: async ({ messages, stepNumber }) => {
    // Every 10 steps, summarize the conversation
    if (stepNumber > 0 && stepNumber % 10 === 0) {
      const summary = await summarizeConversation(messages);
      
      return {
        messages: [
          {
            role: "system",
            content: [{ type: "text", text: `Previous conversation summary: ${summary}` }],
          },
          ...messages.slice(-5), // Keep last 5 messages
        ],
      };
    }
    
    return {};
  },
});

async function summarizeConversation(messages: LanguageModelV2Prompt) {
  "use step";
  
  // Call LLM to summarize conversation
  const response = await fetch("https://api.anthropic.com/v1/messages", {
    method: "POST",
    headers: {
      "x-api-key": process.env.ANTHROPIC_API_KEY!,
      "content-type": "application/json",
    },
    body: JSON.stringify({
      model: "claude-haiku",
      messages: [
        {
          role: "user",
          content: `Summarize this conversation in 2-3 sentences: ${JSON.stringify(messages)}`,
        },
      ],
    }),
  });
  
  const data = await response.json();
  return data.content[0].text;
}

Rate Limit Handling

Adjust request rate based on API limits:
lineNumbers
import { sleep } from "workflow";

let lastRequestTime = 0;
const MIN_REQUEST_INTERVAL = 1000; // 1 second between requests

await agent.stream({
  messages,
  writable,
  prepareStep: async ({ stepNumber }) => {
    if (stepNumber > 0) {
      const timeSinceLastRequest = Date.now() - lastRequestTime;
      if (timeSinceLastRequest < MIN_REQUEST_INTERVAL) {
        await sleep(MIN_REQUEST_INTERVAL - timeSinceLastRequest);
      }
    }
    
    lastRequestTime = Date.now();
    return {};
  },
});

Build docs developers (and LLMs) love