tokio-util provides additional utilities for working with Tokio. This crate contains helpers and extensions that complement the core Tokio runtime.
Installation
Add tokio-util to your Cargo.toml:
[ dependencies ]
tokio-util = "0.7"
tokio-util is not versioned in lockstep with the core tokio crate, but respects Rust’s semantic versioning policy.
Features
The crate is organized around optional feature flags:
codec Encoding and decoding frames with Decoder and Encoder traits
compat Compatibility layer between tokio::io and futures-io traits
io-util Additional I/O utilities like ReaderStream and StreamReader
time Time-based utilities including DelayQueue
net Network utilities and UDP frame handling
rt Runtime utilities and task tracking
Enable All Features
[ dependencies ]
tokio-util = { version = "0.7" , features = [ "full" ] }
Codec - Framing I/O Streams
Requires the codec feature flag.
Codecs provide a way to convert between byte streams and typed frames. This is essential for implementing network protocols.
Using LinesCodec
The LinesCodec splits data by newlines:
use futures :: sink :: SinkExt ;
use tokio_util :: codec :: { FramedWrite , LinesCodec };
#[tokio :: main]
async fn main () {
let buffer = Vec :: new ();
let encoder = LinesCodec :: new ();
let mut writer = FramedWrite :: new ( buffer , encoder );
writer . send ( "Hello" ) . await . unwrap ();
writer . send ( "World" ) . await . unwrap ();
let buffer = writer . get_ref ();
assert_eq! ( buffer . as_slice (), "Hello \n World \n " . as_bytes ());
}
Reading Framed Data
use tokio_stream :: StreamExt ;
use tokio_util :: codec :: { FramedRead , LinesCodec };
#[tokio :: main]
async fn main () {
let message = "Hello \n World" . as_bytes ();
let decoder = LinesCodec :: new ();
let mut reader = FramedRead :: new ( message , decoder );
let frame1 = reader . next () . await . unwrap () . unwrap ();
let frame2 = reader . next () . await . unwrap () . unwrap ();
assert_eq! ( frame1 , "Hello" );
assert_eq! ( frame2 , "World" );
}
Custom Decoder Implementation
Here’s an example of a custom length-prefixed string decoder:
use tokio_util :: codec :: Decoder ;
use bytes :: { BytesMut , Buf };
struct MyStringDecoder {}
const MAX : usize = 8 * 1024 * 1024 ;
impl Decoder for MyStringDecoder {
type Item = String ;
type Error = std :: io :: Error ;
fn decode (
& mut self ,
src : & mut BytesMut
) -> Result < Option < Self :: Item >, Self :: Error > {
if src . len () < 4 {
// Not enough data to read length marker
return Ok ( None );
}
// Read length marker
let mut length_bytes = [ 0 u8 ; 4 ];
length_bytes . copy_from_slice ( & src [ .. 4 ]);
let length = u32 :: from_le_bytes ( length_bytes ) as usize ;
if length > MAX {
return Err ( std :: io :: Error :: new (
std :: io :: ErrorKind :: InvalidData ,
format! ( "Frame of length {} is too large." , length )
));
}
if src . len () < 4 + length {
// Reserve more space
src . reserve ( 4 + length - src . len ());
return Ok ( None );
}
// Extract frame data
let data = src [ 4 .. 4 + length ] . to_vec ();
src . advance ( 4 + length );
// Convert to string
match String :: from_utf8 ( data ) {
Ok ( string ) => Ok ( Some ( string )),
Err ( utf8_error ) => Err ( std :: io :: Error :: new (
std :: io :: ErrorKind :: InvalidData ,
utf8_error . utf8_error (),
)),
}
}
}
Available Codecs
Splits frames by newline characters (\n or \r\n).
Simple codec that works with raw bytes.
Frames data with a length prefix for variable-length messages.
Splits frames by any specified delimiter byte sequence.
Synchronization Primitives
CancellationToken
The CancellationToken allows signaling cancellation requests across multiple tasks:
use tokio :: select;
use tokio_util :: sync :: CancellationToken ;
#[tokio :: main]
async fn main () {
let token = CancellationToken :: new ();
let cloned_token = token . clone ();
let join_handle = tokio :: spawn ( async move {
select! {
_ = cloned_token . cancelled () => {
// Token was cancelled
println! ( "Task cancelled" );
5
}
_ = tokio :: time :: sleep ( std :: time :: Duration :: from_secs ( 9999 )) => {
99
}
}
});
// Cancel after 10ms
tokio :: spawn ( async move {
tokio :: time :: sleep ( std :: time :: Duration :: from_millis ( 10 )) . await ;
token . cancel ();
});
assert_eq! ( 5 , join_handle . await . unwrap ());
}
PollSemaphore
Wrapper around tokio::sync::Semaphore that provides a poll_acquire method.
Time Utilities
Requires the time feature flag.
DelayQueue
A queue where elements are yielded once their delay has expired. Perfect for managing cache timeouts:
use tokio_util :: time :: DelayQueue ;
use std :: time :: Duration ;
#[tokio :: main]
async fn main () {
let mut delay_queue = DelayQueue :: new ();
// Insert items with different delays
delay_queue . insert ( "first" , Duration :: from_millis ( 100 ));
delay_queue . insert ( "second" , Duration :: from_millis ( 200 ));
delay_queue . insert ( "third" , Duration :: from_millis ( 50 ));
// Items are yielded in expiration order
// "third" will be yielded first (50ms)
// then "first" (100ms), then "second" (200ms)
}
I/O Utilities
Requires the io-util feature flag.
ReaderStream
Converts an AsyncRead into a Stream of bytes:
use tokio :: io :: AsyncReadExt ;
use tokio_util :: io :: ReaderStream ;
use tokio_stream :: StreamExt ;
#[tokio :: main]
async fn main () {
let reader = tokio :: io :: repeat ( 42 ) . take ( 10 );
let mut stream = ReaderStream :: new ( reader );
while let Some ( chunk ) = stream . next () . await {
let bytes = chunk . unwrap ();
// Process bytes
}
}
StreamReader
Converts a Stream of bytes into an AsyncRead:
use tokio_util :: io :: StreamReader ;
use tokio_stream :: iter;
use tokio :: io :: AsyncReadExt ;
#[tokio :: main]
async fn main () {
let stream = iter ( vec! [ Ok ( bytes :: Bytes :: from ( "hello" )), Ok ( bytes :: Bytes :: from ( " world" ))]);
let mut reader = StreamReader :: new ( stream );
let mut buf = String :: new ();
reader . read_to_string ( & mut buf ) . await . unwrap ();
assert_eq! ( buf , "hello world" );
}
SyncBridge
Bridges tokio::io async traits with std::io sync traits, enabling compatibility with blocking I/O.
Task Utilities
Requires the rt feature flag.
TaskTracker
Tracks spawned tasks and waits for them to complete:
use tokio_util :: task :: TaskTracker ;
#[tokio :: main]
async fn main () {
let tracker = TaskTracker :: new ();
for i in 0 .. 10 {
tracker . spawn ( async move {
println! ( "Task {} running" , i );
});
}
// Close the tracker and wait for all tasks
tracker . close ();
tracker . wait () . await ;
println! ( "All tasks completed" );
}
JoinMap
A collection of tasks that can be awaited individually:
use tokio_util :: task :: JoinMap ;
#[tokio :: main]
async fn main () {
let mut map = JoinMap :: new ();
map . spawn ( "task1" , async { 1 });
map . spawn ( "task2" , async { 2 });
while let Some (( key , result )) = map . join_next () . await {
println! ( "{} completed with: {}" , key , result . unwrap ());
}
}
Either Type
A sum type for handling two possible types:
use tokio_util :: either :: Either ;
let value : Either < i32 , String > = Either :: Left ( 42 );
match value {
Either :: Left ( num ) => println! ( "Number: {}" , num ),
Either :: Right ( text ) => println! ( "Text: {}" , text ),
}
Resources
API Documentation Complete API reference on docs.rs
GitHub Repository View source code and examples