Testing utilities for Tokio and futures-based code including mock I/O, task harnesses, and stream testing
tokio-test provides testing utilities for Tokio and futures-based code. It includes mock I/O types, task harnesses for polling futures, and stream testing utilities.
The io module provides mock types that follow a predefined script of read and write operations. This is perfect for testing networking code without real network I/O.
use tokio::io::AsyncWriteExt;use tokio_test::io::Builder;use std::io;#[tokio::test]async fn test_write_error() { let error = io::Error::new(io::ErrorKind::BrokenPipe, "pipe closed"); let mut mock = Builder::new() .write(b"first") .write_error(error) .build(); // First write succeeds mock.write_all(b"first").await.unwrap(); // Second write fails let result = mock.write_all(b"second").await; assert!(result.is_err());}
use tokio_test::task;#[test]fn test_immediate_future() { let fut = async { 42 }; let mut task = task::spawn(fut); assert!(task.poll().is_ready()); assert_eq!(task.into_inner(), 42);}
Notice this is a regular #[test], not #[tokio::test]. The task harness doesn’t require the Tokio runtime.
The task harness tracks how many times the future has been woken:
use tokio_test::task;use tokio::sync::oneshot;#[test]fn test_wakeups() { let (tx, rx) = oneshot::channel(); let mut task = task::spawn(rx); // Initial poll - not ready yet assert!(task.poll().is_pending()); assert!(!task.is_woken()); // Send value, which wakes the task tx.send(42).unwrap(); assert!(task.is_woken()); // Poll again - now ready assert!(task.poll().is_ready());}
Some futures require the Tokio runtime context. You can still use the task harness inside #[tokio::test]:
use tokio_test::task;use tokio::time::{sleep, Duration};#[tokio::test]async fn test_with_tokio_context() { let fut = async { sleep(Duration::from_millis(1)).await; "done" }; let mut task = task::spawn(fut); // First poll is pending (sleep not done) assert!(task.poll().is_pending()); // Advance time tokio::time::sleep(Duration::from_millis(2)).await; // Now it's ready assert!(task.poll().is_ready());}
use tokio_stream::StreamExt;use tokio_test::stream_mock::StreamMockBuilder;#[tokio::test]async fn test_stream_consumer() { let (mut stream, mut handle) = StreamMockBuilder::new().build(); // Spawn consumer task let consumer = tokio::spawn(async move { let mut sum = 0; while let Some(value) = stream.next().await { sum += value; } sum }); // Feed values from producer handle.send_item(1); handle.send_item(2); handle.send_item(3); handle.close(); let result = consumer.await.unwrap(); assert_eq!(result, 6);}
use tokio_test::task;use std::future::Future;fn assert_ready<F>(fut: F) -> F::Outputwhere F: Future,{ let mut task = task::spawn(fut); match task.poll() { std::task::Poll::Ready(output) => output, std::task::Poll::Pending => panic!("future was not ready"), }}fn assert_pending<F>(fut: F)where F: Future,{ let mut task = task::spawn(fut); match task.poll() { std::task::Poll::Ready(_) => panic!("future was ready"), std::task::Poll::Pending => {}, }}#[test]fn test_with_helpers() { // Test immediate future let result = assert_ready(async { 42 }); assert_eq!(result, 42); // Test pending future let (tx, rx) = tokio::sync::oneshot::channel::<i32>(); assert_pending(rx); drop(tx); // Close channel to make it ready}