Streaming sessions enable high-throughput, long-lived connections for both reading and appending data to S2 streams. They provide automatic retry, backpressure control, and efficient batching.
Append Sessions
Append sessions allow you to submit multiple batches of records while preserving order and managing backpressure.
Basic Usage
Create an append session
use s2_sdk::{
S2,
session::AppendSessionConfig,
types::{AppendInput, AppendRecord, S2Config},
};
let s2 = S2::new(S2Config::new(access_token))?;
let stream = s2.basin(basin_name).stream(stream_name);
let config = AppendSessionConfig::new()
.with_max_unacked_bytes(5 * 1024 * 1024)?; // 5 MiB
let session = stream.append_session(config);
Submit batches
let records = vec![
AppendRecord::new("record1")?,
AppendRecord::new("record2")?,
];
let input = AppendInput::new(records);
let ticket = session.submit(input).await?;
Wait for acknowledgements
let ack = ticket.await?;
println!("Appended records {}-{}",
ack.start.seq_num,
ack.end.seq_num
);
Close the session
Always call close() to ensure all submitted batches are acknowledged before terminating the session.
Backpressure Control
Append sessions implement backpressure by limiting unacknowledged bytes in flight:
let config = AppendSessionConfig::new()
.with_max_unacked_bytes(10 * 1024 * 1024)? // 10 MiB
.with_max_unacked_batches(
NonZeroU32::new(100).unwrap()
);
let session = stream.append_session(config);
The submit() method will block when the unacknowledged bytes limit is reached, providing automatic backpressure.
Advanced: Reserve and Submit
For more control in async contexts like select! loops, use reserve() followed by submit():
loop {
tokio::select! {
// Reserve capacity first
permit = session.reserve(batch_bytes) => {
let permit = permit?;
// Submit when ready
let ticket = permit.submit(input);
}
// Other async operations
_ = some_other_future => {
// Handle other events
}
}
}
The reserve() method is cancel-safe, making it ideal for use in select! loops.
Automatic Retry
Append sessions automatically retry on transient failures:
- Network disconnections
- Timeout errors
- Server unavailability
The session tracks inflight appends and resends them on reconnection:
// From session/append.rs:395-482
// Retry logic with exponential backoff
// - Resets backoff on successful acknowledgements
// - Respects retry policy (e.g., only idempotent appends)
// - Maintains append order during retries
Read Sessions
Read sessions provide a streaming interface for consuming records with automatic retry and heartbeat monitoring.
Basic Usage
Create a read session
use s2_sdk::types::ReadInput;
use futures::StreamExt;
let input = ReadInput::new()
.with_start_seq_num(0)?
.with_wait_secs(30)?; // Long poll for 30 seconds
let mut batches = stream.read_session(input).await?;
Consume batches
while let Some(batch) = batches.next().await {
let batch = batch?;
for record in batch.records {
println!("seq_num: {}, body: {:?}",
record.seq_num,
record.body
);
}
if batch.tail.is_some() {
println!("Caught up to tail");
}
}
Following the Stream
Use with_wait_secs() to follow a stream in real-time:
let input = ReadInput::new()
.with_start_at_tail()?
.with_wait_secs(60)?; // Wait up to 60 seconds for new data
let mut batches = stream.read_session(input).await?;
loop {
tokio::select! {
batch = batches.next() => {
let Some(batch) = batch else { break };
let batch = batch?;
// Process batch
}
_ = tokio::signal::ctrl_c() => break,
}
}
The read session automatically adjusts the wait budget on retries to avoid excessive long polling.
Heartbeat Monitoring
Read sessions monitor for heartbeats to detect stalled connections:
// From session/read.rs:159-170
// Timeout after 20 seconds without data or heartbeat
match timeout(Duration::from_secs(20), batches.next()).await {
Ok(Some(batch)) => {
// Process batch
}
Ok(None) => break, // Stream ended
Err(_) => {
// Heartbeat timeout - will retry
}
}
Error Handling
Both append and read sessions handle errors gracefully:
Retryable Errors
These errors trigger automatic retry:
HeartbeatTimeout (read)
AckTimeout (append)
ServerDisconnected (append)
- Network connectivity issues
Terminal Errors
These errors terminate the session:
SessionClosed
SessionDropped
- Permission denied
- Invalid requests
match session.submit(input).await {
Ok(ticket) => {
match ticket.await {
Ok(ack) => println!("Success: {:?}", ack),
Err(e) => eprintln!("Append failed: {}", e),
}
}
Err(e) => eprintln!("Submit failed: {}", e),
}
Best Practices
Reuse sessions: Create sessions once and reuse them for multiple operations to avoid connection overhead.
Tune buffer sizes: Adjust max_unacked_bytes based on your throughput requirements and memory constraints.
Handle graceful shutdown: Always call close() on sessions during application shutdown to ensure data is flushed.
Sessions use AbortOnDropHandle internally, so dropping a session will abort the background task. Always prefer explicit close() calls.