Skip to main content
When building chat interfaces, it’s common to encounter network interruptions, page refreshes, or serverless function timeouts that can break the connection to an in-progress agent. Where a standard chat implementation would require the user to resend their message and wait for the entire response again, workflow runs are durable, and so are the streams attached to them. This means a stream can be resumed at any point, optionally only syncing the data that was missed since the last connection. Resumable streams come out of the box with Workflow DevKit. The WorkflowChatTransport helper provides a drop-in transport for the AI SDK that handles client-side resumption logic automatically.

Implementing Stream Resumption

Let’s add stream resumption to a basic durable agent:
1

Return the Run ID from Your API

Modify your chat endpoint to include the workflow run ID in a response header:
app/api/chat/route.ts
import { createUIMessageStreamResponse, type UIMessage } from "ai";
import { convertToModelMessages } from "ai";
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat/workflow";

export async function POST(req: Request) {
  const { messages }: { messages: UIMessage[] } = await req.json();
  const modelMessages = convertToModelMessages(messages);

  const run = await start(chatWorkflow, [modelMessages]);

  return createUIMessageStreamResponse({
    stream: run.readable,
    headers: {
      "x-workflow-run-id": run.runId, // For reconnection
    },
  });
}
2

Add a Stream Reconnection Endpoint

Create a new API route that returns the stream for an existing run:
app/api/chat/[id]/stream/route.ts
import { createUIMessageStreamResponse } from "ai";
import { getRun } from "workflow/api";

export async function GET(
  request: Request,
  { params }: { params: Promise<{ id: string }> }
) {
  const { id } = await params;
  const { searchParams } = new URL(request.url);

  // Client provides the last chunk index they received
  const startIndexParam = searchParams.get("startIndex");
  const startIndex = startIndexParam
    ? parseInt(startIndexParam, 10)
    : undefined;

  // Fetch the existing run
  const run = getRun(id);
  const stream = run.getReadable({ startIndex });

  return createUIMessageStreamResponse({ stream });
}
The startIndex parameter ensures the client can resume from exactly where it left off.
3

Use WorkflowChatTransport in the Client

Replace the default transport in useChat with WorkflowChatTransport:
app/page.tsx
"use client";

import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useMemo } from "react";

export default function ChatPage() {
  // Check for an active workflow run on mount
  const activeRunId = useMemo(() => {
    if (typeof window === "undefined") return;
    return localStorage.getItem("active-workflow-run-id") ?? undefined;
  }, []);

  const { messages, sendMessage, status } = useChat({
    resume: Boolean(activeRunId),
    transport: new WorkflowChatTransport({
      api: "/api/chat",

      // Store the run ID when a new chat starts
      onChatSendMessage: (response) => {
        const workflowRunId = response.headers.get("x-workflow-run-id");
        if (workflowRunId) {
          localStorage.setItem("active-workflow-run-id", workflowRunId);
        }
      },

      // Clear the run ID when the chat completes
      onChatEnd: () => {
        localStorage.removeItem("active-workflow-run-id");
      },

      // Use the stored run ID for reconnection
      prepareReconnectToStreamRequest: ({ api, ...rest }) => {
        const runId = localStorage.getItem("active-workflow-run-id");
        if (!runId) throw new Error("No active workflow run ID found");
        return {
          ...rest,
          api: `/api/chat/${encodeURIComponent(runId)}/stream`,
        };
      },
    }),
  });

  return (
    <div>
      {/* ... render your chat UI ... */}
    </div>
  );
}
Now try refreshing the page during a response, or simulate a network interruption. The client will automatically reconnect to the same stream and continue from where it left off.

How It Works

  1. Initial request: When the user sends a message, WorkflowChatTransport makes a POST to /api/chat
  2. Run ID storage: The API starts a workflow and returns the run ID in the x-workflow-run-id header
  3. Callback execution: onChatSendMessage stores this run ID in localStorage
  4. Automatic reconnection: If the stream is interrupted before receiving a “finish” chunk, the transport automatically reconnects
  5. URL construction: prepareReconnectToStreamRequest builds the reconnection URL using the stored run ID
  6. Resume from index: The reconnection endpoint returns the stream from the last known position
  7. Cleanup: When the stream completes, onChatEnd clears the stored run ID
This approach also handles page refreshes. The client will automatically reconnect to the stream from the last known position when the UI loads with a stored run ID.

Advanced Configuration

Custom Run ID Storage

Instead of localStorage, store the run ID in a database:
lineNumbers
const transport = new WorkflowChatTransport({
  api: "/api/chat",
  onChatSendMessage: async (response) => {
    const workflowRunId = response.headers.get("x-workflow-run-id");
    if (workflowRunId) {
      await fetch("/api/sessions", {
        method: "POST",
        body: JSON.stringify({ sessionId, workflowRunId }),
      });
    }
  },
  prepareReconnectToStreamRequest: async ({ api, ...rest }) => {
    const response = await fetch(`/api/sessions/${sessionId}`);
    const { workflowRunId } = await response.json();
    return {
      ...rest,
      api: `/api/chat/${encodeURIComponent(workflowRunId)}/stream`,
    };
  },
});

Error Handling

Handle reconnection errors gracefully:
lineNumbers
const [error, setError] = useState<string | null>(null);

const transport = new WorkflowChatTransport({
  api: "/api/chat",
  maxConsecutiveErrors: 5, // Default is 3
  prepareReconnectToStreamRequest: ({ api, ...rest }) => {
    try {
      const runId = localStorage.getItem("active-workflow-run-id");
      if (!runId) {
        setError("No active session found");
        throw new Error("No active workflow run ID found");
      }
      return {
        ...rest,
        api: `/api/chat/${encodeURIComponent(runId)}/stream`,
      };
    } catch (err) {
      setError(err instanceof Error ? err.message : "Failed to reconnect");
      throw err;
    }
  },
});

Authentication

Add authentication headers to reconnection requests:
lineNumbers
const transport = new WorkflowChatTransport({
  api: "/api/chat",
  prepareReconnectToStreamRequest: ({ api, ...rest }) => {
    const runId = localStorage.getItem("active-workflow-run-id");
    const token = localStorage.getItem("auth-token");
    
    if (!runId) throw new Error("No active workflow run ID found");
    
    return {
      ...rest,
      api: `/api/chat/${encodeURIComponent(runId)}/stream`,
      headers: {
        Authorization: `Bearer ${token}`,
      },
    };
  },
});

Manual Reconnection

Trigger reconnection manually:
lineNumbers
import { useChat } from "@ai-sdk/react";
import { WorkflowChatTransport } from "@workflow/ai";
import { useState } from "react";

export default function ChatPage() {
  const [transport] = useState(() => new WorkflowChatTransport({ api: "/api/chat" }));
  
  const { messages } = useChat({ transport });

  const handleReconnect = async () => {
    const runId = localStorage.getItem("active-workflow-run-id");
    if (!runId) return;

    await transport.reconnectToStream({
      chatId: runId,
    });
  };

  return (
    <div>
      <button onClick={handleReconnect}>Reconnect</button>
      {/* ... render messages ... */}
    </div>
  );
}

Testing Resumability

Simulate Network Interruption

Test stream resumption by artificially interrupting the connection:
lineNumbers
// In browser console:
const controller = new AbortController();
fetch("/api/chat/[runId]/stream", { signal: controller.signal });

// Abort after 2 seconds
setTimeout(() => controller.abort(), 2000);

Simulate Function Timeout

Test resumption after serverless function timeout:
app/api/chat/route.ts
export async function POST(req: Request) {
  const { messages } = await req.json();
  const modelMessages = convertToModelMessages(messages);

  const run = await start(chatWorkflow, [modelMessages]);

  // Simulate timeout by closing connection early
  if (process.env.NODE_ENV === "development") {
    setTimeout(() => {
      // Connection will be closed, triggering reconnection
    }, 5000);
  }

  return createUIMessageStreamResponse({
    stream: run.readable,
    headers: { "x-workflow-run-id": run.runId },
  });
}

Build docs developers (and LLMs) love