Skip to main content
tokio-util provides additional utilities for working with Tokio. This crate contains helpers and extensions that complement the core Tokio runtime.

Installation

Add tokio-util to your Cargo.toml:
Cargo.toml
[dependencies]
tokio-util = "0.7"
tokio-util is not versioned in lockstep with the core tokio crate, but respects Rust’s semantic versioning policy.

Features

The crate is organized around optional feature flags:

codec

Encoding and decoding frames with Decoder and Encoder traits

compat

Compatibility layer between tokio::io and futures-io traits

io-util

Additional I/O utilities like ReaderStream and StreamReader

time

Time-based utilities including DelayQueue

net

Network utilities and UDP frame handling

rt

Runtime utilities and task tracking

Enable All Features

Cargo.toml
[dependencies]
tokio-util = { version = "0.7", features = ["full"] }

Codec - Framing I/O Streams

Requires the codec feature flag.
Codecs provide a way to convert between byte streams and typed frames. This is essential for implementing network protocols.

Using LinesCodec

The LinesCodec splits data by newlines:
use futures::sink::SinkExt;
use tokio_util::codec::{FramedWrite, LinesCodec};

#[tokio::main]
async fn main() {
    let buffer = Vec::new();
    let encoder = LinesCodec::new();
    let mut writer = FramedWrite::new(buffer, encoder);

    writer.send("Hello").await.unwrap();
    writer.send("World").await.unwrap();

    let buffer = writer.get_ref();
    assert_eq!(buffer.as_slice(), "Hello\nWorld\n".as_bytes());
}

Reading Framed Data

use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};

#[tokio::main]
async fn main() {
    let message = "Hello\nWorld".as_bytes();
    let decoder = LinesCodec::new();
    let mut reader = FramedRead::new(message, decoder);

    let frame1 = reader.next().await.unwrap().unwrap();
    let frame2 = reader.next().await.unwrap().unwrap();

    assert_eq!(frame1, "Hello");
    assert_eq!(frame2, "World");
}

Custom Decoder Implementation

Here’s an example of a custom length-prefixed string decoder:
use tokio_util::codec::Decoder;
use bytes::{BytesMut, Buf};

struct MyStringDecoder {}

const MAX: usize = 8 * 1024 * 1024;

impl Decoder for MyStringDecoder {
    type Item = String;
    type Error = std::io::Error;

    fn decode(
        &mut self,
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error> {
        if src.len() < 4 {
            // Not enough data to read length marker
            return Ok(None);
        }

        // Read length marker
        let mut length_bytes = [0u8; 4];
        length_bytes.copy_from_slice(&src[..4]);
        let length = u32::from_le_bytes(length_bytes) as usize;

        if length > MAX {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!("Frame of length {} is too large.", length)
            ));
        }

        if src.len() < 4 + length {
            // Reserve more space
            src.reserve(4 + length - src.len());
            return Ok(None);
        }

        // Extract frame data
        let data = src[4..4 + length].to_vec();
        src.advance(4 + length);

        // Convert to string
        match String::from_utf8(data) {
            Ok(string) => Ok(Some(string)),
            Err(utf8_error) => Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                utf8_error.utf8_error(),
            )),
        }
    }
}

Available Codecs

Splits frames by newline characters (\n or \r\n).
Simple codec that works with raw bytes.
Frames data with a length prefix for variable-length messages.
Splits frames by any specified delimiter byte sequence.

Synchronization Primitives

CancellationToken

The CancellationToken allows signaling cancellation requests across multiple tasks:
use tokio::select;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();
    let cloned_token = token.clone();

    let join_handle = tokio::spawn(async move {
        select! {
            _ = cloned_token.cancelled() => {
                // Token was cancelled
                println!("Task cancelled");
                5
            }
            _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => {
                99
            }
        }
    });

    // Cancel after 10ms
    tokio::spawn(async move {
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        token.cancel();
    });

    assert_eq!(5, join_handle.await.unwrap());
}

PollSemaphore

Wrapper around tokio::sync::Semaphore that provides a poll_acquire method.

Time Utilities

Requires the time feature flag.

DelayQueue

A queue where elements are yielded once their delay has expired. Perfect for managing cache timeouts:
use tokio_util::time::DelayQueue;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let mut delay_queue = DelayQueue::new();

    // Insert items with different delays
    delay_queue.insert("first", Duration::from_millis(100));
    delay_queue.insert("second", Duration::from_millis(200));
    delay_queue.insert("third", Duration::from_millis(50));

    // Items are yielded in expiration order
    // "third" will be yielded first (50ms)
    // then "first" (100ms), then "second" (200ms)
}

I/O Utilities

Requires the io-util feature flag.

ReaderStream

Converts an AsyncRead into a Stream of bytes:
use tokio::io::AsyncReadExt;
use tokio_util::io::ReaderStream;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let reader = tokio::io::repeat(42).take(10);
    let mut stream = ReaderStream::new(reader);

    while let Some(chunk) = stream.next().await {
        let bytes = chunk.unwrap();
        // Process bytes
    }
}

StreamReader

Converts a Stream of bytes into an AsyncRead:
use tokio_util::io::StreamReader;
use tokio_stream::iter;
use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() {
    let stream = iter(vec![Ok(bytes::Bytes::from("hello")), Ok(bytes::Bytes::from(" world"))]);
    let mut reader = StreamReader::new(stream);

    let mut buf = String::new();
    reader.read_to_string(&mut buf).await.unwrap();
    assert_eq!(buf, "hello world");
}

SyncBridge

Bridges tokio::io async traits with std::io sync traits, enabling compatibility with blocking I/O.

Task Utilities

Requires the rt feature flag.

TaskTracker

Tracks spawned tasks and waits for them to complete:
use tokio_util::task::TaskTracker;

#[tokio::main]
async fn main() {
    let tracker = TaskTracker::new();

    for i in 0..10 {
        tracker.spawn(async move {
            println!("Task {} running", i);
        });
    }

    // Close the tracker and wait for all tasks
    tracker.close();
    tracker.wait().await;

    println!("All tasks completed");
}

JoinMap

A collection of tasks that can be awaited individually:
use tokio_util::task::JoinMap;

#[tokio::main]
async fn main() {
    let mut map = JoinMap::new();

    map.spawn("task1", async { 1 });
    map.spawn("task2", async { 2 });

    while let Some((key, result)) = map.join_next().await {
        println!("{} completed with: {}", key, result.unwrap());
    }
}

Either Type

A sum type for handling two possible types:
use tokio_util::either::Either;

let value: Either<i32, String> = Either::Left(42);

match value {
    Either::Left(num) => println!("Number: {}", num),
    Either::Right(text) => println!("Text: {}", text),
}

Resources

API Documentation

Complete API reference on docs.rs

GitHub Repository

View source code and examples

Build docs developers (and LLMs) love