The stream() method streams a conversation turn, yielding events incrementally. Each call sends one user message and yields events until the agent finishes its turn.
for event in client.stream("hello"): print(event.type, event.data)
for event in client.stream("List files in /mnt/user-data/workspace"): if event.type == "messages-tuple": data = event.data # Tool call if data.get("type") == "ai" and "tool_calls" in data: for tc in data["tool_calls"]: print(f"Tool: {tc['name']} with args {tc['args']}") # Tool result elif data.get("type") == "tool": print(f"Result: {data['content']}") # AI text elif data.get("type") == "ai" and data.get("content"): print(f"AI: {data['content']}")
tool_calls = []tool_results = []for event in client.stream("Read file.txt and count the lines"): if event.type == "messages-tuple": data = event.data if data.get("type") == "ai" and "tool_calls" in data: tool_calls.extend(data["tool_calls"]) elif data.get("type") == "tool": tool_results.append({ "name": data.get("name"), "content": data.get("content") })print(f"Used tools: {[tc['name'] for tc in tool_calls]}")print(f"Got {len(tool_results)} results")
Event types and data structures match the LangGraph SSE protocol so consumers can switch between HTTP streaming and embedded mode without changing their event-handling logic.
def test_stream_yields_messages_tuple_and_end(self, client): """stream() produces at least one messages-tuple event and ends with end.""" events = list(client.stream("Say hi in one word.")) types = [e.type for e in events] assert "messages-tuple" in types assert "values" in types assert types[-1] == "end" for e in events: assert isinstance(e, StreamEvent)