Streaming enables real-time AI responses and creates a better user experience. This guide covers the chatbot’s streaming implementation, from basic text streaming to resumable streams.
Streaming architecture
The chatbot uses the AI SDK’s streaming capabilities combined with Next.js Server Actions and React Server Components.
Basic streaming setup
The chat API route demonstrates the core streaming pattern:
app/(chat)/api/chat/route.ts
export const maxDuration = 60 ;
export async function POST ( request : Request ) {
const { id , message , messages , selectedChatModel , selectedVisibilityType } =
requestBody ;
const session = await auth ();
const modelMessages = await convertToModelMessages ( uiMessages );
const stream = createUIMessageStream ({
originalMessages: isToolApprovalFlow ? uiMessages : undefined ,
execute : async ({ writer : dataStream }) => {
const result = streamText ({
model: getLanguageModel ( selectedChatModel ),
system: systemPrompt ({ selectedChatModel , requestHints }),
messages: modelMessages ,
stopWhen: stepCountIs ( 5 ),
experimental_activeTools: isReasoningModel
? []
: [
"getWeather" ,
"createDocument" ,
"updateDocument" ,
"requestSuggestions" ,
],
tools: {
getWeather ,
createDocument: createDocument ({ session , dataStream }),
updateDocument: updateDocument ({ session , dataStream }),
requestSuggestions: requestSuggestions ({ session , dataStream }),
},
});
dataStream . merge (
result . toUIMessageStream ({ sendReasoning: isReasoningModel })
);
},
});
return createUIMessageStreamResponse ({
stream ,
});
}
The maxDuration export sets the maximum execution time for the API route. Vercel defaults to 10 seconds, but streaming responses need more time.
Stream lifecycle
Stream initialization
Create a UI message stream with execute callback: const stream = createUIMessageStream ({
originalMessages: isToolApprovalFlow ? uiMessages : undefined ,
execute : async ({ writer : dataStream }) => {
// Stream implementation
},
});
Model streaming
Call streamText with the language model and configuration: const result = streamText ({
model: getLanguageModel ( selectedChatModel ),
system: systemPrompt ({ selectedChatModel , requestHints }),
messages: modelMessages ,
stopWhen: stepCountIs ( 5 ),
tools: {
getWeather ,
createDocument: createDocument ({ session , dataStream }),
},
});
Merge streams
Merge the AI response stream with the data stream: dataStream . merge (
result . toUIMessageStream ({ sendReasoning: isReasoningModel })
);
Custom data streaming
Write custom data to the stream (e.g., chat title updates): if ( titlePromise ) {
const title = await titlePromise ;
dataStream . write ({ type: "data-chat-title" , data: title });
updateChatTitleById ({ chatId: id , title });
}
Finalize stream
Handle stream completion with onFinish callback: const stream = createUIMessageStream ({
execute : async ({ writer : dataStream }) => {
// ...
},
onFinish : async ({ messages : finishedMessages }) => {
await saveMessages ({
messages: finishedMessages . map (( msg ) => ({
id: msg . id ,
role: msg . role ,
parts: msg . parts ,
createdAt: new Date (),
attachments: [],
chatId: id ,
})),
});
},
});
Streaming custom data
Tools and API routes can stream custom data types to the UI:
dataStream . write ({
type: "data-kind" ,
data: kind ,
transient: true ,
});
dataStream . write ({
type: "data-id" ,
data: id ,
transient: true ,
});
dataStream . write ({
type: "data-title" ,
data: title ,
transient: true ,
});
The transient: true flag indicates data that should be sent to the client but not persisted in the database. Use this for UI updates and temporary state.
Custom data types
Define your custom data types for type safety:
export type CustomUIDataTypes = {
textDelta : string ;
imageDelta : string ;
sheetDelta : string ;
codeDelta : string ;
suggestion : Suggestion ;
appendMessage : string ;
id : string ;
title : string ;
kind : ArtifactKind ;
clear : null ;
finish : null ;
"chat-title" : string ;
};
export type ChatMessage = UIMessage <
MessageMetadata ,
CustomUIDataTypes ,
ChatTools
>;
The chatbot supports a tool approval flow where users confirm tool executions:
app/(chat)/api/chat/route.ts
const isToolApprovalFlow = Boolean ( messages );
const stream = createUIMessageStream ({
originalMessages: isToolApprovalFlow ? uiMessages : undefined ,
execute : async ({ writer : dataStream }) => {
// ...
},
onFinish : async ({ messages : finishedMessages }) => {
if ( isToolApprovalFlow ) {
for ( const finishedMsg of finishedMessages ) {
const existingMsg = uiMessages . find (( m ) => m . id === finishedMsg . id );
if ( existingMsg ) {
await updateMessage ({
id: finishedMsg . id ,
parts: finishedMsg . parts ,
});
} else {
await saveMessages ({
messages: [
{
id: finishedMsg . id ,
role: finishedMsg . role ,
parts: finishedMsg . parts ,
createdAt: new Date (),
attachments: [],
chatId: id ,
},
],
});
}
}
}
},
});
When originalMessages is provided, the stream continues from an existing conversation state, allowing tools to be approved before execution.
Resumable streams
The chatbot implements resumable streams for network resilience:
app/(chat)/api/chat/route.ts
function getStreamContext () {
try {
return createResumableStreamContext ({ waitUntil: after });
} catch ( _ ) {
return null ;
}
}
return createUIMessageStreamResponse ({
stream ,
async consumeSseStream ({ stream : sseStream }) {
if ( ! process . env . REDIS_URL ) {
return ;
}
try {
const streamContext = getStreamContext ();
if ( streamContext ) {
const streamId = generateId ();
await createStreamId ({ streamId , chatId: id });
await streamContext . createNewResumableStream (
streamId ,
() => sseStream
);
}
} catch ( _ ) {
// ignore redis errors
}
},
});
How resumable streams work
When a stream starts, it’s registered in Redis with a unique ID
The stream data is buffered in Redis as it flows
If the connection drops, clients can reconnect and resume from the last received position
The Stream table tracks active streams:
export const stream = pgTable (
"Stream" ,
{
id: uuid ( "id" ). notNull (). defaultRandom (),
chatId: uuid ( "chatId" ). notNull (),
createdAt: timestamp ( "createdAt" ). notNull (),
},
( table ) => ({
pk: primaryKey ({ columns: [ table . id ] }),
chatRef: foreignKey ({
columns: [ table . chatId ],
foreignColumns: [ chat . id ],
}),
})
);
Reasoning models
Reasoning models require special handling for streaming thinking tokens:
app/(chat)/api/chat/route.ts
const isReasoningModel =
selectedChatModel . endsWith ( "-thinking" ) ||
( selectedChatModel . includes ( "reasoning" ) &&
! selectedChatModel . includes ( "non-reasoning" ));
const result = streamText ({
model: getLanguageModel ( selectedChatModel ),
system: systemPrompt ({ selectedChatModel , requestHints }),
messages: modelMessages ,
experimental_activeTools: isReasoningModel ? [] : [ ... ],
providerOptions: isReasoningModel
? {
anthropic: {
thinking: { type: "enabled" , budgetTokens: 10_000 },
},
}
: undefined ,
});
dataStream . merge (
result . toUIMessageStream ({ sendReasoning: isReasoningModel })
);
Reasoning models don’t support tools. The experimental_activeTools array is empty, and sendReasoning: true enables streaming of thinking tokens.
Error handling
The streaming implementation includes comprehensive error handling:
app/(chat)/api/chat/route.ts
const stream = createUIMessageStream ({
execute : async ({ writer : dataStream }) => {
// ...
},
onError : ( error ) => {
if (
error instanceof Error &&
error . message ?. includes (
"AI Gateway requires a valid credit card on file to service requests"
)
) {
return "AI Gateway requires a valid credit card on file to service requests. Please visit https://vercel.com/d?to=%2F%5Bteam%5D%2F%7E%2Fai%3Fmodal%3Dadd-credit-card to add a card and unlock your free credits." ;
}
return "Oops, an error occurred!" ;
},
});
Errors are caught and converted to user-friendly messages that stream to the UI.
Set appropriate maxDuration
Configure the maximum duration based on your use case: export const maxDuration = 60 ; // seconds
Limit tool steps
Prevent infinite tool loops with stopWhen: const result = streamText ({
stopWhen: stepCountIs ( 5 ),
// ...
});
Use transient data
Mark UI-only updates as transient to avoid database overhead: dataStream . write ({
type: "data-clear" ,
data: null ,
transient: true ,
});
Enable telemetry
Monitor streaming performance in production: const result = streamText ({
experimental_telemetry: {
isEnabled: isProductionEnvironment ,
functionId: "stream-text" ,
},
// ...
});
Message persistence
Finished messages are saved to the database using the parts structure:
onFinish : async ({ messages : finishedMessages }) => {
if ( ! isToolApprovalFlow && finishedMessages . length > 0 ) {
await saveMessages ({
messages: finishedMessages . map (( currentMessage ) => ({
id: currentMessage . id ,
role: currentMessage . role ,
parts: currentMessage . parts ,
createdAt: new Date (),
attachments: [],
chatId: id ,
})),
});
}
}
The parts field automatically includes text content, tool calls, and tool results from the streaming session.
Testing streams locally
To test streaming locally:
Then send a message in the UI and watch the network tab for SSE (Server-Sent Events) streams.
Common issues and solutions:
Stream cuts off early : Check maxDuration is sufficient
No streaming, just final response : Verify you’re calling toUIMessageStream()
Data not persisting : Check onFinish is saving messages
Tool results not showing : Ensure tools call dataStream.write()
Resumable streams not working : Verify REDIS_URL is set
Next steps