Skip to main content
tokio-stream provides utilities to work with Stream and Tokio. A Stream is an asynchronous sequence of values - think of it as an asynchronous version of the standard library’s Iterator trait.

Installation

Add tokio-stream to your Cargo.toml:
Cargo.toml
[dependencies]
tokio-stream = "0.1"

With All Features

Cargo.toml
[dependencies]
tokio-stream = { version = "0.1", features = ["full"] }

Features

time

Time-based stream utilities (enabled by default)

net

Network stream wrappers for TCP/Unix sockets

io-util

I/O utility stream wrappers

fs

File system stream wrappers

sync

Synchronization primitive stream wrappers

signal

OS signal stream wrappers

Basic Usage

Iterating Over a Stream

You cannot use for in syntax with streams. Instead, use while let with the next() method:
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![0, 1, 2]);

    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}
Always bring StreamExt into scope to access stream methods like next(), filter(), map(), etc.

Creating Streams

From an Iterator

use tokio_stream::{self as stream, StreamExt};

let stream = stream::iter(vec![1, 2, 3, 4, 5]);

Single Value Stream

use tokio_stream::{self as stream, StreamExt};

let stream = stream::once(42);

Empty Stream

use tokio_stream::{self as stream, StreamExt};

let stream: tokio_stream::Empty<i32> = stream::empty();

Pending Stream

A stream that never yields any values:
use tokio_stream::{self as stream, StreamExt};

let stream: tokio_stream::Pending<i32> = stream::pending();

Stream Extension Methods

The StreamExt trait provides numerous combinator methods:

Filtering and Mapping

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3, 4, 5])
        .filter(|x| x % 2 == 0)  // Keep only even numbers
        .map(|x| x * 2);          // Double each value

    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![4, 8]);
}

Filter Map

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec!["1", "two", "3", "four"])
        .filter_map(|s| s.parse::<i32>().ok());

    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![1, 3]);
}

Take and Skip

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    // Take first 3 items
    let stream = stream::iter(1..=10).take(3);
    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![1, 2, 3]);

    // Skip first 3 items
    let stream = stream::iter(1..=5).skip(3);
    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![4, 5]);
}

Take While and Skip While

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3, 4, 1, 2])
        .take_while(|x| *x < 4);

    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![1, 2, 3]);
}

Fold and Collect

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    // Fold (reduce) a stream
    let sum = stream::iter(vec![1, 2, 3, 4, 5])
        .fold(0, |acc, x| acc + x)
        .await;
    assert_eq!(sum, 15);

    // Collect into a Vec
    let vec: Vec<i32> = stream::iter(1..=5).collect().await;
    assert_eq!(vec, vec![1, 2, 3, 4, 5]);
}

All and Any

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    // Check if all elements satisfy a predicate
    let all_positive = stream::iter(vec![1, 2, 3])
        .all(|x| x > 0)
        .await;
    assert!(all_positive);

    // Check if any element satisfies a predicate
    let has_even = stream::iter(vec![1, 2, 3])
        .any(|x| x % 2 == 0)
        .await;
    assert!(has_even);
}

Merging Streams

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 3, 5]);
    let stream2 = stream::iter(vec![2, 4, 6]);

    let merged = stream1.merge(stream2);
    let result: Vec<i32> = merged.collect().await;
    // Result contains items from both streams (order may vary)
    assert_eq!(result.len(), 6);
}

Chaining Streams

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);

    let chained = stream1.chain(stream2);
    let result: Vec<i32> = chained.collect().await;
    assert_eq!(result, vec![1, 2, 3, 4, 5, 6]);
}

Then - Async Map

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3])
        .then(|x| async move {
            // Perform async operation
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
            x * 2
        });

    let result: Vec<i32> = stream.collect().await;
    assert_eq!(result, vec![2, 4, 6]);
}

Time-Based Utilities

Requires the time feature (enabled by default).

Timeout

Add a timeout to each item in the stream:
use tokio_stream::{self as stream, StreamExt};
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3])
        .timeout(Duration::from_secs(5));

    // Each item must arrive within 5 seconds
}

Throttle

Limit the rate at which items are produced:
use tokio_stream::{self as stream, StreamExt};
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3])
        .throttle(Duration::from_millis(100));

    // Items are spaced at least 100ms apart
}

Chunks with Timeout

Group items into chunks, yielding when chunk size or timeout is reached:
use tokio_stream::{self as stream, StreamExt};
use tokio::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=10)
        .chunks_timeout(5, Duration::from_millis(100));

    // Yields Vec<i32> with up to 5 items or after 100ms
}

Wrappers

The wrappers module provides Stream implementations for Tokio types.

Interval Stream

Requires the time feature.
use tokio::time::{Duration, interval};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let interval = interval(Duration::from_millis(100));
    let mut stream = IntervalStream::new(interval);

    for _ in 0..5 {
        stream.next().await;
        println!("Tick!");
    }
}

MPSC Channel Stream

Requires the sync feature.
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    let mut stream = ReceiverStream::new(rx);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(value) = stream.next().await {
        println!("Received: {}", value);
    }
}

Broadcast Channel Stream

Requires the sync feature.
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = broadcast::channel(16);
    let mut stream = BroadcastStream::new(rx);

    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).unwrap();
        }
    });

    while let Some(result) = stream.next().await {
        match result {
            Ok(value) => println!("Received: {}", value),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
}

Watch Channel Stream

Requires the sync feature.
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel(0);
    let mut stream = WatchStream::new(rx);

    tokio::spawn(async move {
        for i in 1..=5 {
            tx.send(i).unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
        }
    });

    while let Some(value) = stream.next().await {
        println!("Value changed to: {}", value);
    }
}

TCP Listener Stream

Requires the net feature.
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    let mut stream = TcpListenerStream::new(listener);

    while let Some(socket_result) = stream.next().await {
        match socket_result {
            Ok(socket) => {
                tokio::spawn(async move {
                    // Handle connection
                });
            }
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    Ok(())
}

ReadDir Stream

Requires the fs feature.
use tokio::fs;
use tokio_stream::wrappers::ReadDirStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let read_dir = fs::read_dir(".").await?;
    let mut stream = ReadDirStream::new(read_dir);

    while let Some(entry_result) = stream.next().await {
        if let Ok(entry) = entry_result {
            println!("File: {:?}", entry.file_name());
        }
    }

    Ok(())
}

StreamMap

Manage multiple keyed streams concurrently:
use tokio_stream::{StreamMap, StreamExt};

#[tokio::main]
async fn main() {
    let mut map = StreamMap::new();

    map.insert("stream1", tokio_stream::iter(vec![1, 2, 3]));
    map.insert("stream2", tokio_stream::iter(vec![4, 5, 6]));

    while let Some((key, value)) = map.next().await {
        println!("From {}: {}", key, value);
    }
}

Resources

API Documentation

Complete API reference on docs.rs

Streams Tutorial

In-depth tutorial on the Tokio website

GitHub Repository

View source code and examples

futures StreamExt

Alternative StreamExt from the futures crate

Build docs developers (and LLMs) love