Skip to main content
Streaming sessions enable high-throughput, long-lived connections for both reading and appending data to S2 streams. They provide automatic retry, backpressure control, and efficient batching.

Append Sessions

Append sessions allow you to submit multiple batches of records while preserving order and managing backpressure.

Basic Usage

1

Create an append session

use s2_sdk::{
    S2,
    session::AppendSessionConfig,
    types::{AppendInput, AppendRecord, S2Config},
};

let s2 = S2::new(S2Config::new(access_token))?;
let stream = s2.basin(basin_name).stream(stream_name);

let config = AppendSessionConfig::new()
    .with_max_unacked_bytes(5 * 1024 * 1024)?; // 5 MiB

let session = stream.append_session(config);
2

Submit batches

let records = vec![
    AppendRecord::new("record1")?,
    AppendRecord::new("record2")?,
];

let input = AppendInput::new(records);
let ticket = session.submit(input).await?;
3

Wait for acknowledgements

let ack = ticket.await?;
println!("Appended records {}-{}", 
    ack.start.seq_num, 
    ack.end.seq_num
);
4

Close the session

session.close().await?;
Always call close() to ensure all submitted batches are acknowledged before terminating the session.

Backpressure Control

Append sessions implement backpressure by limiting unacknowledged bytes in flight:
let config = AppendSessionConfig::new()
    .with_max_unacked_bytes(10 * 1024 * 1024)? // 10 MiB
    .with_max_unacked_batches(
        NonZeroU32::new(100).unwrap()
    );

let session = stream.append_session(config);
The submit() method will block when the unacknowledged bytes limit is reached, providing automatic backpressure.

Advanced: Reserve and Submit

For more control in async contexts like select! loops, use reserve() followed by submit():
loop {
    tokio::select! {
        // Reserve capacity first
        permit = session.reserve(batch_bytes) => {
            let permit = permit?;
            // Submit when ready
            let ticket = permit.submit(input);
        }
        // Other async operations
        _ = some_other_future => {
            // Handle other events
        }
    }
}
The reserve() method is cancel-safe, making it ideal for use in select! loops.

Automatic Retry

Append sessions automatically retry on transient failures:
  • Network disconnections
  • Timeout errors
  • Server unavailability
The session tracks inflight appends and resends them on reconnection:
// From session/append.rs:395-482
// Retry logic with exponential backoff
// - Resets backoff on successful acknowledgements
// - Respects retry policy (e.g., only idempotent appends)
// - Maintains append order during retries

Read Sessions

Read sessions provide a streaming interface for consuming records with automatic retry and heartbeat monitoring.

Basic Usage

1

Create a read session

use s2_sdk::types::ReadInput;
use futures::StreamExt;

let input = ReadInput::new()
    .with_start_seq_num(0)?
    .with_wait_secs(30)?; // Long poll for 30 seconds

let mut batches = stream.read_session(input).await?;
2

Consume batches

while let Some(batch) = batches.next().await {
    let batch = batch?;
    
    for record in batch.records {
        println!("seq_num: {}, body: {:?}", 
            record.seq_num, 
            record.body
        );
    }

    if batch.tail.is_some() {
        println!("Caught up to tail");
    }
}

Following the Stream

Use with_wait_secs() to follow a stream in real-time:
let input = ReadInput::new()
    .with_start_at_tail()?
    .with_wait_secs(60)?; // Wait up to 60 seconds for new data

let mut batches = stream.read_session(input).await?;

loop {
    tokio::select! {
        batch = batches.next() => {
            let Some(batch) = batch else { break };
            let batch = batch?;
            // Process batch
        }
        _ = tokio::signal::ctrl_c() => break,
    }
}
The read session automatically adjusts the wait budget on retries to avoid excessive long polling.

Heartbeat Monitoring

Read sessions monitor for heartbeats to detect stalled connections:
// From session/read.rs:159-170
// Timeout after 20 seconds without data or heartbeat
match timeout(Duration::from_secs(20), batches.next()).await {
    Ok(Some(batch)) => {
        // Process batch
    }
    Ok(None) => break, // Stream ended
    Err(_) => {
        // Heartbeat timeout - will retry
    }
}

Error Handling

Both append and read sessions handle errors gracefully:

Retryable Errors

These errors trigger automatic retry:
  • HeartbeatTimeout (read)
  • AckTimeout (append)
  • ServerDisconnected (append)
  • Network connectivity issues

Terminal Errors

These errors terminate the session:
  • SessionClosed
  • SessionDropped
  • Permission denied
  • Invalid requests
match session.submit(input).await {
    Ok(ticket) => {
        match ticket.await {
            Ok(ack) => println!("Success: {:?}", ack),
            Err(e) => eprintln!("Append failed: {}", e),
        }
    }
    Err(e) => eprintln!("Submit failed: {}", e),
}

Best Practices

Reuse sessions: Create sessions once and reuse them for multiple operations to avoid connection overhead.
Tune buffer sizes: Adjust max_unacked_bytes based on your throughput requirements and memory constraints.
Handle graceful shutdown: Always call close() on sessions during application shutdown to ensure data is flushed.
Sessions use AbortOnDropHandle internally, so dropping a session will abort the background task. Always prefer explicit close() calls.

Build docs developers (and LLMs) love