Skip to main content

Streaming Responses

BAML lets you stream structured JSON output from LLMs as it comes in. Instead of waiting for the complete response, you can process partial results incrementally.

Why Stream?

Streaming provides:
  • Better UX: Show progress to users instead of a loading spinner
  • Lower latency: Start processing data before the full response completes
  • Real-time feedback: Allow users to cancel if output goes off track

How BAML Streaming Works

Raw LLM streaming produces invalid partial JSON:
{"items": [{"name": "Appl
{"items": [{"name": "Apple", "quantity": 2, "price": 1.
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost":
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00}
BAML fixes this partial JSON and transforms it into semantically valid partial objects:
  • Fields are nullable until complete
  • Arrays build incrementally
  • Partial strings stream character-by-character
  • Numbers only appear when complete

Basic Streaming Example

Let’s extract receipt information with streaming:
class ReceiptItem {
  name string
  description string?
  quantity int
  price float
}

class ReceiptInfo {
  items ReceiptItem[]
  total_cost float?
}

function ExtractReceiptInfo(email: string) -> ReceiptInfo {
  client "openai/gpt-4o"
  prompt #"
    Given the receipt below:

    {{ email }}

    {{ ctx.output_format }}
  "#
}
import asyncio
from baml_client import b, partial_types, types

# Using a stream
async def example1(receipt: str):
    stream = b.stream.ExtractReceiptInfo(receipt)

    # partial is a Partial type with all Optional fields
    async for partial in stream:
        print(f"partial: parsed {len(partial.items)} items (object: {partial})")

    # final is the full, original, validated ReceiptInfo type
    final = await stream.get_final_response()
    print(f"final: {len(final.items)} items (object: {final})")

# Sync version
def example2(receipt: str):
    stream = b.stream.ExtractReceiptInfo(receipt)
    
    for partial in stream:
        print(f"partial: parsed {len(partial.items)} items")
    
    final = stream.get_final_response()
    print(f"final: {len(final.items)} items")
Number fields (int, float) only stream in when complete. You’ll only see null or the final value, not partial numbers like 1, 12, 129.9.

Cancelling Streams

Allow users to cancel ongoing streams:
import { b } from './baml_client'

const controller = new AbortController()

const stream = b.stream.ExtractReceiptInfo(receipt, {
  abortController: controller
})

// Process stream with ability to cancel
let itemCount = 0
for await (const partial of stream) {
  itemCount = partial.items?.length || 0
  console.log(`Received ${itemCount} items so far`)

  // Cancel if we have enough items
  if (itemCount >= 5) {
    console.log('Stopping stream - got enough items')
    controller.abort()
    break
  }
}

// Or cancel after a timeout
setTimeout(() => {
  controller.abort()
  console.log('Stream cancelled due to timeout')
}, 5000)

Semantic Streaming Attributes

BAML provides powerful attributes to control how data streams, ensuring partial values always maintain semantic validity.

@stream.done - Atomic Values

Ensure a field or type only streams when completely finished:
class ReceiptItem {
  name string
  quantity int
  price float

  // The entire ReceiptItem will only stream when complete
  @@stream.done
}

// Receipts is a list of ReceiptItems,
// each item will only stream when complete
type Receipts = ReceiptItem[]

class Person {
  // Name will only appear when fully complete
  name string @stream.done
  // Numbers only appear when complete by default
  age int
  // Bio will stream token by token
  bio string
}

Atomic List Items with Unions

A common pattern for streaming tool calls and messages:
class ToolCall {
  name string
  parameters string
}

class Message {
  role string
  content string
}

type OutputItem = ToolCall | Message

// Each list element appears only when fully complete.
// The list grows incrementally as items finish.
function Run(input: string) -> (OutputItem @stream.done)[] {
  client MyClient
  prompt #"
    {{ input }}
    {{ ctx.output_format }}
  "#
}

@stream.not_null - Always Present

Ensure a containing object only streams when this field has a value:
class Message {
  // Message won't stream until type is known
  type "error" | "success" | "info" @stream.not_null
  // Timestamp will only appear when complete
  timestamp string @stream.done
  // Content can stream token by token
  content string
}

@stream.with_state - Completion Metadata

Add metadata to track if a field has finished streaming:
class BlogPost {
  // The blog post will only stream when title is known
  title string @stream.done @stream.not_null
  // The content will stream token by token, with completion state
  content string @stream.with_state
}
This generates:
class StreamState(BaseModel, Generic[T]):
  value: T
  state: Literal["Pending", "Incomplete", "Complete"]

class BlogPost(BaseModel):
  title: str
  content: StreamState[str | None]

Type Transformation Summary

BAML TypeGenerated Type (during streaming)Description
TPartial[T]?Default: Nullable and partial
T @stream.doneT?Nullable but always complete when present
T @stream.not_nullPartial[T]Always present but may be partial
T @stream.done @stream.not_nullTAlways present and always complete
T @stream.with_stateStreamState[Partial[T]?]Includes streaming state metadata
The return type of a function is not affected by streaming attributes!

Real-World Example: Stock Recommendations

Combine streaming attributes to maintain domain invariants:
enum Stock {
  APPL
  MSFT
  GOOG
  BAML
}

// Make recommendations atomic
class Recommendation {
  stock Stock
  amount float
  action "buy" | "sell"
  @@stream.done
}

class AssistantMessage {
  message_type "greeting" | "conversation" | "farewell" @stream.not_null
  message string @stream.with_state @stream.not_null
}

function Respond(
  history: (UserMessage | AssistantMessage | Recommendation)[]
) -> AssistantMessage | Recommendation {
  client DeepseekR1
  prompt #"
    Make the next message in the conversation, using a conversational
    message or a stock recommendation, based on this history:
    {{ history }}.

    {{ ctx.output_format }}
  "#
}
This ensures:
  • Recommendations only appear when complete (not modified by subsequent messages)
  • Messages don’t appear until message_type is known
  • UI can show a spinner while message is streaming

Common Streaming Patterns

User-Initiated Cancellation (React)

function StreamingComponent() {
  const [controller, setController] = useState<AbortController | null>(null)
  const [isStreaming, setIsStreaming] = useState(false)
  const [result, setResult] = useState("")

  const startStreaming = async () => {
    const newController = new AbortController()
    setController(newController)
    setIsStreaming(true)

    try {
      const stream = b.stream.GenerateContent(prompt, {
        abortController: newController
      })

      for await (const partial of stream) {
        setResult(partial.content || "")
      }
    } catch (error) {
      if (error.name === 'BamlAbortError') {
        console.log('Stream cancelled by user')
      }
    } finally {
      setIsStreaming(false)
    }
  }

  const stopStreaming = () => controller?.abort()

  return (
    <div>
      <button onClick={startStreaming} disabled={isStreaming}>
        Start Streaming
      </button>
      <button onClick={stopStreaming} disabled={!isStreaming}>
        Stop
      </button>
      <div>{result}</div>
    </div>
  )
}

Streaming in FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from baml_py import AbortController

app = FastAPI()
active_streams = {}

@app.post("/stream/{stream_id}")
async def start_stream(stream_id: str, prompt: str):
    controller = AbortController()
    active_streams[stream_id] = controller

    async def generate():
        try:
            stream = b.stream.GenerateContent(
                prompt,
                baml_options={"abort_controller": controller}
            )
            async for partial in stream:
                if controller.aborted:
                    break
                yield f"data: {partial.content}\n\n"
        except BamlAbortError:
            yield "data: [CANCELLED]\n\n"
        finally:
            active_streams.pop(stream_id, None)

    return StreamingResponse(generate(), media_type="text/event-stream")

@app.post("/stop/{stream_id}")
async def stop_stream(stream_id: str):
    if controller := active_streams.get(stream_id):
        controller.abort()
        return {"status": "stopped"}
    return {"status": "not found"}

Next Steps

Build docs developers (and LLMs) love