Skip to main content
tokio-test provides testing utilities for Tokio and futures-based code. It includes mock I/O types, task harnesses for polling futures, and stream testing utilities.

Installation

Add tokio-test to your dev dependencies:
Cargo.toml
[dev-dependencies]
tokio-test = "0.4"
tokio = { version = "1", features = ["full"] }
tokio-test is typically used only in dev-dependencies since it’s designed for testing.

Core Features

Mock I/O

Scriptable AsyncRead/AsyncWrite implementations

Task Harness

Poll futures without runtime boilerplate

Stream Testing

Mock streams for testing stream consumers

Mock I/O

The io module provides mock types that follow a predefined script of read and write operations. This is perfect for testing networking code without real network I/O.

Basic Mock Builder

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_test::io::Builder;

#[tokio::test]
async fn test_read_write() {
    let mut mock = Builder::new()
        .read(b"hello ")
        .read(b"world!")
        .write(b"goodbye")
        .build();

    // Read operations
    let mut buf = [0; 6];
    mock.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"hello ");

    mock.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"world!");

    // Write operation
    mock.write_all(b"goodbye").await.unwrap();
}
If your code attempts operations not in the script, the mock will panic. This helps catch unexpected I/O patterns.

Simulating Read Errors

use tokio::io::AsyncReadExt;
use tokio_test::io::Builder;
use std::io;

#[tokio::test]
async fn test_read_error() {
    let error = io::Error::new(io::ErrorKind::Other, "connection lost");
    let mut mock = Builder::new()
        .read(b"partial")
        .read_error(error)
        .build();

    // First read succeeds
    let mut buf = [0; 7];
    mock.read_exact(&mut buf).await.unwrap();
    assert_eq!(&buf, b"partial");

    // Second read fails with error
    let result = mock.read_exact(&mut buf).await;
    assert!(result.is_err());
}

Simulating Write Errors

use tokio::io::AsyncWriteExt;
use tokio_test::io::Builder;
use std::io;

#[tokio::test]
async fn test_write_error() {
    let error = io::Error::new(io::ErrorKind::BrokenPipe, "pipe closed");
    let mut mock = Builder::new()
        .write(b"first")
        .write_error(error)
        .build();

    // First write succeeds
    mock.write_all(b"first").await.unwrap();

    // Second write fails
    let result = mock.write_all(b"second").await;
    assert!(result.is_err());
}

Adding Delays

Simulate network latency:
use tokio::io::AsyncReadExt;
use tokio::time::{Duration, Instant};
use tokio_test::io::Builder;

#[tokio::test]
async fn test_with_delay() {
    let mut mock = Builder::new()
        .wait(Duration::from_millis(100))
        .read(b"delayed data")
        .build();

    let start = Instant::now();
    let mut buf = [0; 12];
    mock.read_exact(&mut buf).await.unwrap();

    assert!(start.elapsed() >= Duration::from_millis(100));
    assert_eq!(&buf, b"delayed data");
}

Testing Protocol Implementation

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_test::io::Builder;

// Simple protocol: read command, write response
async fn handle_protocol<T>(mut io: T) -> std::io::Result<()>
where
    T: AsyncReadExt + AsyncWriteExt + Unpin,
{
    let mut buf = [0; 4];
    io.read_exact(&mut buf).await?;

    if &buf == b"PING" {
        io.write_all(b"PONG").await?;
    }

    Ok(())
}

#[tokio::test]
async fn test_protocol() {
    let mock = Builder::new()
        .read(b"PING")
        .write(b"PONG")
        .build();

    handle_protocol(mock).await.unwrap();
}

Task Harness

The task module provides utilities for polling futures without needing the full runtime setup.

Basic Task Spawning

use tokio_test::task;

#[test]
fn test_immediate_future() {
    let fut = async { 42 };
    let mut task = task::spawn(fut);

    assert!(task.poll().is_ready());
    assert_eq!(task.into_inner(), 42);
}
Notice this is a regular #[test], not #[tokio::test]. The task harness doesn’t require the Tokio runtime.

Testing Pending Futures

use tokio_test::task;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct PendingOnce {
    polled: bool,
}

impl Future for PendingOnce {
    type Output = i32;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<i32> {
        if self.polled {
            Poll::Ready(42)
        } else {
            self.polled = true;
            Poll::Pending
        }
    }
}

#[test]
fn test_pending_future() {
    let fut = PendingOnce { polled: false };
    let mut task = task::spawn(fut);

    // First poll returns pending
    assert!(task.poll().is_pending());

    // Second poll returns ready
    assert!(task.poll().is_ready());
}

Tracking Wakeups

The task harness tracks how many times the future has been woken:
use tokio_test::task;
use tokio::sync::oneshot;

#[test]
fn test_wakeups() {
    let (tx, rx) = oneshot::channel();
    let mut task = task::spawn(rx);

    // Initial poll - not ready yet
    assert!(task.poll().is_pending());
    assert!(!task.is_woken());

    // Send value, which wakes the task
    tx.send(42).unwrap();
    assert!(task.is_woken());

    // Poll again - now ready
    assert!(task.poll().is_ready());
}

Testing Streams

The task harness also works with streams:
use tokio_test::task;
use tokio_stream::{self as stream, StreamExt};

#[test]
fn test_stream() {
    let stream = stream::iter(vec![1, 2, 3]);
    let mut task = task::spawn(stream);

    // Poll each item
    assert_eq!(task.poll_next(), Some(1));
    assert_eq!(task.poll_next(), Some(2));
    assert_eq!(task.poll_next(), Some(3));
    assert_eq!(task.poll_next(), None);
}

Testing Within Tokio Context

Some futures require the Tokio runtime context. You can still use the task harness inside #[tokio::test]:
use tokio_test::task;
use tokio::time::{sleep, Duration};

#[tokio::test]
async fn test_with_tokio_context() {
    let fut = async {
        sleep(Duration::from_millis(1)).await;
        "done"
    };

    let mut task = task::spawn(fut);

    // First poll is pending (sleep not done)
    assert!(task.poll().is_pending());

    // Advance time
    tokio::time::sleep(Duration::from_millis(2)).await;

    // Now it's ready
    assert!(task.poll().is_ready());
}

Stream Mock

The stream_mock module provides a way to test stream consumers by controlling what values are yielded.

Basic Stream Mock

use tokio_stream::StreamExt;
use tokio_test::stream_mock::StreamMockBuilder;

#[tokio::test]
async fn test_stream_consumer() {
    let (mut stream, mut handle) = StreamMockBuilder::new().build();

    // Spawn consumer task
    let consumer = tokio::spawn(async move {
        let mut sum = 0;
        while let Some(value) = stream.next().await {
            sum += value;
        }
        sum
    });

    // Feed values from producer
    handle.send_item(1);
    handle.send_item(2);
    handle.send_item(3);
    handle.close();

    let result = consumer.await.unwrap();
    assert_eq!(result, 6);
}

block_on Helper

For simple cases, tokio_test::block_on provides a quick way to run async code:
use tokio_test::block_on;

#[test]
fn test_async_function() {
    let result = block_on(async {
        // Some async computation
        42
    });

    assert_eq!(result, 42);
}
This creates a new current-thread runtime for each call. For more control, use #[tokio::test] instead.

Testing Patterns

Testing Error Handling

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_test::io::Builder;
use std::io;

async fn resilient_read<T>(mut io: T) -> io::Result<Vec<u8>>
where
    T: AsyncReadExt + Unpin,
{
    let mut buf = vec![0; 1024];
    match io.read(&mut buf).await {
        Ok(n) => {
            buf.truncate(n);
            Ok(buf)
        }
        Err(e) if e.kind() == io::ErrorKind::Interrupted => {
            // Retry on interrupt
            resilient_read(io).await
        }
        Err(e) => Err(e),
    }
}

#[tokio::test]
async fn test_resilient_read() {
    let error = io::Error::new(io::ErrorKind::Interrupted, "interrupted");
    let mock = Builder::new()
        .read_error(error)
        .read(b"success after retry")
        .build();

    let result = resilient_read(mock).await.unwrap();
    assert_eq!(result, b"success after retry");
}

Testing Request-Response Patterns

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio_test::io::Builder;

async fn echo_server<T>(io: T)
where
    T: AsyncBufReadExt + AsyncWriteExt + Unpin,
{
    let mut reader = BufReader::new(io);
    let mut line = String::new();

    while reader.read_line(&mut line).await.unwrap() > 0 {
        reader.get_mut().write_all(line.as_bytes()).await.unwrap();
        line.clear();
    }
}

#[tokio::test]
async fn test_echo_server() {
    let mock = Builder::new()
        .read(b"hello\n")
        .write(b"hello\n")
        .read(b"world\n")
        .write(b"world\n")
        .build();

    echo_server(mock).await;
}

Testing Timeout Behavior

use tokio::io::AsyncReadExt;
use tokio::time::{timeout, Duration};
use tokio_test::io::Builder;

#[tokio::test]
async fn test_read_timeout() {
    let mut mock = Builder::new()
        .wait(Duration::from_secs(2))
        .read(b"too slow")
        .build();

    let result = timeout(
        Duration::from_millis(100),
        mock.read(&mut [0; 8])
    ).await;

    assert!(result.is_err()); // Timeout occurred
}

Assertions and Macros

Custom Assertion Helpers

use tokio_test::task;
use std::future::Future;

fn assert_ready<F>(fut: F) -> F::Output
where
    F: Future,
{
    let mut task = task::spawn(fut);
    match task.poll() {
        std::task::Poll::Ready(output) => output,
        std::task::Poll::Pending => panic!("future was not ready"),
    }
}

fn assert_pending<F>(fut: F)
where
    F: Future,
{
    let mut task = task::spawn(fut);
    match task.poll() {
        std::task::Poll::Ready(_) => panic!("future was ready"),
        std::task::Poll::Pending => {},
    }
}

#[test]
fn test_with_helpers() {
    // Test immediate future
    let result = assert_ready(async { 42 });
    assert_eq!(result, 42);

    // Test pending future
    let (tx, rx) = tokio::sync::oneshot::channel::<i32>();
    assert_pending(rx);
    drop(tx); // Close channel to make it ready
}

Best Practices

Mock I/O eliminates network unpredictability and makes tests faster and more reliable.
Use read_error and write_error to verify your code handles failures gracefully.
Complex I/O scripts are hard to maintain. Consider breaking tests into smaller units.
Use is_woken() to ensure futures don’t wake unnecessarily, which can indicate performance issues.

Comparison with Alternatives

Tool
Use Case
ToolBest ForRequires Runtime
tokio_test::io::BuilderTesting I/O without networkNo
tokio_test::task::spawnTesting futures manuallyNo*
#[tokio::test]Integration testsYes
Real network I/OEnd-to-end testsYes
  • Task harness can be used with or without the runtime, but some futures require runtime context.

Resources

API Documentation

Complete API reference on docs.rs

Testing Guide

Comprehensive testing guide on Tokio website

GitHub Repository

View source code and examples

tokio::test macro

Learn about the #[tokio::test] macro

Build docs developers (and LLMs) love