Streaming Responses
BAML lets you stream structured JSON output from LLMs as it comes in. Instead of waiting for the complete response, you can process partial results incrementally.
Why Stream?
Streaming provides:
- Better UX: Show progress to users instead of a loading spinner
- Lower latency: Start processing data before the full response completes
- Real-time feedback: Allow users to cancel if output goes off track
How BAML Streaming Works
Raw LLM streaming produces invalid partial JSON:
{"items": [{"name": "Appl
{"items": [{"name": "Apple", "quantity": 2, "price": 1.
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost":
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00}
BAML fixes this partial JSON and transforms it into semantically valid partial objects:
- Fields are nullable until complete
- Arrays build incrementally
- Partial strings stream character-by-character
- Numbers only appear when complete
Basic Streaming Example
Let’s extract receipt information with streaming:
class ReceiptItem {
name string
description string?
quantity int
price float
}
class ReceiptInfo {
items ReceiptItem[]
total_cost float?
}
function ExtractReceiptInfo(email: string) -> ReceiptInfo {
client "openai/gpt-4o"
prompt #"
Given the receipt below:
{{ email }}
{{ ctx.output_format }}
"#
}
import asyncio
from baml_client import b, partial_types, types
# Using a stream
async def example1(receipt: str):
stream = b.stream.ExtractReceiptInfo(receipt)
# partial is a Partial type with all Optional fields
async for partial in stream:
print(f"partial: parsed {len(partial.items)} items (object: {partial})")
# final is the full, original, validated ReceiptInfo type
final = await stream.get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
# Sync version
def example2(receipt: str):
stream = b.stream.ExtractReceiptInfo(receipt)
for partial in stream:
print(f"partial: parsed {len(partial.items)} items")
final = stream.get_final_response()
print(f"final: {len(final.items)} items")
import { b } from './baml_client'
// Using both async iteration and getFinalResponse()
const example1 = async (receipt: string) => {
const stream = b.stream.ExtractReceiptInfo(receipt)
// partial is a Partial type with all Optional fields
for await (const partial of stream) {
console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
}
// final is the full, original, validated ReceiptInfo type
const final = await stream.getFinalResponse()
console.log(`final: ${final.items.length} items (object: ${final})`)
}
// Using only async iteration
const example2 = async (receipt: string) => {
for await (const partial of b.stream.ExtractReceiptInfo(receipt)) {
console.log(`partial: ${partial.items?.length} items`)
}
}
require_relative "baml_client/client"
$b = Baml.Client
# Using both iteration and get_final_response()
def example1(receipt)
stream = $b.stream.ExtractReceiptInfo(receipt)
stream.each do |partial|
puts "partial: #{partial.items&.length} items"
end
final = stream.get_final_response
puts "final: #{final.items.length} items"
end
Number fields (int, float) only stream in when complete. You’ll only see null or the final value, not partial numbers like 1, 12, 129.9.
Cancelling Streams
Allow users to cancel ongoing streams:
import { b } from './baml_client'
const controller = new AbortController()
const stream = b.stream.ExtractReceiptInfo(receipt, {
abortController: controller
})
// Process stream with ability to cancel
let itemCount = 0
for await (const partial of stream) {
itemCount = partial.items?.length || 0
console.log(`Received ${itemCount} items so far`)
// Cancel if we have enough items
if (itemCount >= 5) {
console.log('Stopping stream - got enough items')
controller.abort()
break
}
}
// Or cancel after a timeout
setTimeout(() => {
controller.abort()
console.log('Stream cancelled due to timeout')
}, 5000)
from baml_client.async_client import b
from baml_py import AbortController
import asyncio
controller = AbortController()
stream = b.stream.ExtractReceiptInfo(
receipt,
baml_options={"abort_controller": controller}
)
# Process stream with ability to cancel
item_count = 0
async for partial in stream:
item_count = len(partial.items) if partial.items else 0
print(f"Received {item_count} items so far")
# Cancel if we have enough items
if item_count >= 5:
print("Stopping stream - got enough items")
controller.abort()
break
# Or cancel after a timeout
async def cancel_after_timeout():
await asyncio.sleep(5)
controller.abort()
print("Stream cancelled due to timeout")
asyncio.create_task(cancel_after_timeout())
Semantic Streaming Attributes
BAML provides powerful attributes to control how data streams, ensuring partial values always maintain semantic validity.
@stream.done - Atomic Values
Ensure a field or type only streams when completely finished:
class ReceiptItem {
name string
quantity int
price float
// The entire ReceiptItem will only stream when complete
@@stream.done
}
// Receipts is a list of ReceiptItems,
// each item will only stream when complete
type Receipts = ReceiptItem[]
class Person {
// Name will only appear when fully complete
name string @stream.done
// Numbers only appear when complete by default
age int
// Bio will stream token by token
bio string
}
Atomic List Items with Unions
A common pattern for streaming tool calls and messages:
class ToolCall {
name string
parameters string
}
class Message {
role string
content string
}
type OutputItem = ToolCall | Message
// Each list element appears only when fully complete.
// The list grows incrementally as items finish.
function Run(input: string) -> (OutputItem @stream.done)[] {
client MyClient
prompt #"
{{ input }}
{{ ctx.output_format }}
"#
}
@stream.not_null - Always Present
Ensure a containing object only streams when this field has a value:
class Message {
// Message won't stream until type is known
type "error" | "success" | "info" @stream.not_null
// Timestamp will only appear when complete
timestamp string @stream.done
// Content can stream token by token
content string
}
Add metadata to track if a field has finished streaming:
class BlogPost {
// The blog post will only stream when title is known
title string @stream.done @stream.not_null
// The content will stream token by token, with completion state
content string @stream.with_state
}
This generates:
class StreamState(BaseModel, Generic[T]):
value: T
state: Literal["Pending", "Incomplete", "Complete"]
class BlogPost(BaseModel):
title: str
content: StreamState[str | None]
interface StreamState<T> {
value: T
state: "Pending" | "Incomplete" | "Complete"
}
interface BlogPost {
title: string
content: StreamState<string>
}
| BAML Type | Generated Type (during streaming) | Description |
|---|
T | Partial[T]? | Default: Nullable and partial |
T @stream.done | T? | Nullable but always complete when present |
T @stream.not_null | Partial[T] | Always present but may be partial |
T @stream.done @stream.not_null | T | Always present and always complete |
T @stream.with_state | StreamState[Partial[T]?] | Includes streaming state metadata |
The return type of a function is not affected by streaming attributes!
Real-World Example: Stock Recommendations
Combine streaming attributes to maintain domain invariants:
enum Stock {
APPL
MSFT
GOOG
BAML
}
// Make recommendations atomic
class Recommendation {
stock Stock
amount float
action "buy" | "sell"
@@stream.done
}
class AssistantMessage {
message_type "greeting" | "conversation" | "farewell" @stream.not_null
message string @stream.with_state @stream.not_null
}
function Respond(
history: (UserMessage | AssistantMessage | Recommendation)[]
) -> AssistantMessage | Recommendation {
client DeepseekR1
prompt #"
Make the next message in the conversation, using a conversational
message or a stock recommendation, based on this history:
{{ history }}.
{{ ctx.output_format }}
"#
}
This ensures:
- Recommendations only appear when complete (not modified by subsequent messages)
- Messages don’t appear until
message_type is known
- UI can show a spinner while
message is streaming
Common Streaming Patterns
User-Initiated Cancellation (React)
function StreamingComponent() {
const [controller, setController] = useState<AbortController | null>(null)
const [isStreaming, setIsStreaming] = useState(false)
const [result, setResult] = useState("")
const startStreaming = async () => {
const newController = new AbortController()
setController(newController)
setIsStreaming(true)
try {
const stream = b.stream.GenerateContent(prompt, {
abortController: newController
})
for await (const partial of stream) {
setResult(partial.content || "")
}
} catch (error) {
if (error.name === 'BamlAbortError') {
console.log('Stream cancelled by user')
}
} finally {
setIsStreaming(false)
}
}
const stopStreaming = () => controller?.abort()
return (
<div>
<button onClick={startStreaming} disabled={isStreaming}>
Start Streaming
</button>
<button onClick={stopStreaming} disabled={!isStreaming}>
Stop
</button>
<div>{result}</div>
</div>
)
}
Streaming in FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from baml_py import AbortController
app = FastAPI()
active_streams = {}
@app.post("/stream/{stream_id}")
async def start_stream(stream_id: str, prompt: str):
controller = AbortController()
active_streams[stream_id] = controller
async def generate():
try:
stream = b.stream.GenerateContent(
prompt,
baml_options={"abort_controller": controller}
)
async for partial in stream:
if controller.aborted:
break
yield f"data: {partial.content}\n\n"
except BamlAbortError:
yield "data: [CANCELLED]\n\n"
finally:
active_streams.pop(stream_id, None)
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/stop/{stream_id}")
async def stop_stream(stream_id: str):
if controller := active_streams.get(stream_id):
controller.abort()
return {"status": "stopped"}
return {"status": "not found"}
Next Steps