Stream utilities for Tokio with adapters, wrappers, and extension traits for working with async streams
tokio-stream provides utilities to work with Stream and Tokio. A Stream is an asynchronous sequence of values - think of it as an asynchronous version of the standard library’s Iterator trait.
You cannot use for in syntax with streams. Instead, use while let with the next() method:
use tokio_stream::{self as stream, StreamExt};#[tokio::main]async fn main() { let mut stream = stream::iter(vec![0, 1, 2]); while let Some(value) = stream.next().await { println!("Got {}", value); }}
Always bring StreamExt into scope to access stream methods like next(), filter(), map(), etc.
use tokio_stream::{self as stream, StreamExt};#[tokio::main]async fn main() { let stream = stream::iter(vec![1, 2, 3, 4, 5]) .filter(|x| x % 2 == 0) // Keep only even numbers .map(|x| x * 2); // Double each value let result: Vec<i32> = stream.collect().await; assert_eq!(result, vec![4, 8]);}
use tokio_stream::{self as stream, StreamExt};#[tokio::main]async fn main() { // Take first 3 items let stream = stream::iter(1..=10).take(3); let result: Vec<i32> = stream.collect().await; assert_eq!(result, vec![1, 2, 3]); // Skip first 3 items let stream = stream::iter(1..=5).skip(3); let result: Vec<i32> = stream.collect().await; assert_eq!(result, vec![4, 5]);}
use tokio_stream::{self as stream, StreamExt};#[tokio::main]async fn main() { // Check if all elements satisfy a predicate let all_positive = stream::iter(vec![1, 2, 3]) .all(|x| x > 0) .await; assert!(all_positive); // Check if any element satisfies a predicate let has_even = stream::iter(vec![1, 2, 3]) .any(|x| x % 2 == 0) .await; assert!(has_even);}
use tokio_stream::{self as stream, StreamExt};#[tokio::main]async fn main() { let stream1 = stream::iter(vec![1, 3, 5]); let stream2 = stream::iter(vec![2, 4, 6]); let merged = stream1.merge(stream2); let result: Vec<i32> = merged.collect().await; // Result contains items from both streams (order may vary) assert_eq!(result.len(), 6);}
use tokio_stream::{self as stream, StreamExt};use tokio::time::Duration;#[tokio::main]async fn main() { let stream = stream::iter(vec![1, 2, 3]) .timeout(Duration::from_secs(5)); // Each item must arrive within 5 seconds}
use tokio_stream::{self as stream, StreamExt};use tokio::time::Duration;#[tokio::main]async fn main() { let stream = stream::iter(vec![1, 2, 3]) .throttle(Duration::from_millis(100)); // Items are spaced at least 100ms apart}
Group items into chunks, yielding when chunk size or timeout is reached:
use tokio_stream::{self as stream, StreamExt};use tokio::time::Duration;#[tokio::main]async fn main() { let stream = stream::iter(1..=10) .chunks_timeout(5, Duration::from_millis(100)); // Yields Vec<i32> with up to 5 items or after 100ms}
use tokio::time::{Duration, interval};use tokio_stream::wrappers::IntervalStream;use tokio_stream::StreamExt;#[tokio::main]async fn main() { let interval = interval(Duration::from_millis(100)); let mut stream = IntervalStream::new(interval); for _ in 0..5 { stream.next().await; println!("Tick!"); }}
use tokio::sync::mpsc;use tokio_stream::wrappers::ReceiverStream;use tokio_stream::StreamExt;#[tokio::main]async fn main() { let (tx, rx) = mpsc::channel(10); let mut stream = ReceiverStream::new(rx); tokio::spawn(async move { for i in 0..5 { tx.send(i).await.unwrap(); } }); while let Some(value) = stream.next().await { println!("Received: {}", value); }}
use tokio::sync::broadcast;use tokio_stream::wrappers::BroadcastStream;use tokio_stream::StreamExt;#[tokio::main]async fn main() { let (tx, rx) = broadcast::channel(16); let mut stream = BroadcastStream::new(rx); tokio::spawn(async move { for i in 0..5 { tx.send(i).unwrap(); } }); while let Some(result) = stream.next().await { match result { Ok(value) => println!("Received: {}", value), Err(e) => eprintln!("Error: {}", e), } }}
use tokio::sync::watch;use tokio_stream::wrappers::WatchStream;use tokio_stream::StreamExt;#[tokio::main]async fn main() { let (tx, rx) = watch::channel(0); let mut stream = WatchStream::new(rx); tokio::spawn(async move { for i in 1..=5 { tx.send(i).unwrap(); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } }); while let Some(value) = stream.next().await { println!("Value changed to: {}", value); }}