This guide covers performance optimization techniques for the S2 SDK, from batching strategies to connection pooling and memory management.
Throughput Optimization
Producer Configuration
For maximum throughput, tune the producer’s batching and backpressure settings:
use s2_sdk::producer::ProducerConfig;
use s2_sdk::batching::BatchingConfig;
use std::time::Duration;
let config = ProducerConfig::new()
.with_max_unacked_bytes(10 * 1024 * 1024)? // 10 MiB in flight
.with_batching(
BatchingConfig::new()
.with_linger(Duration::from_millis(50)) // Wait longer to batch more
.with_max_batch_records(1000)? // Max out batch size
.with_max_batch_bytes(1024 * 1024)? // Full 1 MiB batches
);
let producer = stream.producer(config);
High throughput trade-off: Larger batches and higher linger times improve throughput but add latency.
Append Session Configuration
For direct session control:
use s2_sdk::session::AppendSessionConfig;
use std::num::NonZeroU32;
let config = AppendSessionConfig::new()
.with_max_unacked_bytes(20 * 1024 * 1024)? // 20 MiB
.with_max_unacked_batches(
NonZeroU32::new(200).unwrap() // Allow 200 batches in flight
);
let session = stream.append_session(config);
Higher max_unacked_bytes allows more data in flight but uses more memory. Balance this with your available resources.
Parallel Submissions
Submit multiple batches concurrently:
use futures::future::join_all;
let session = stream.append_session(config);
// Prepare multiple batches
let batches = vec![
AppendInput::new(records1),
AppendInput::new(records2),
AppendInput::new(records3),
];
// Submit all in parallel
let tickets: Vec<_> = batches
.into_iter()
.map(|batch| session.submit(batch))
.collect();
let tickets = join_all(tickets).await;
// Wait for all acknowledgements
for ticket in tickets {
let ticket = ticket?;
ticket.await?;
}
session.close().await?;
Latency Optimization
Low-Latency Producer
Minimize batching delays for low-latency scenarios:
let config = ProducerConfig::new()
.with_max_unacked_bytes(1024 * 1024)? // 1 MiB
.with_batching(
BatchingConfig::new()
.with_linger(Duration::from_millis(1)) // Minimal linger
.with_max_batch_records(10)? // Smaller batches
);
let producer = stream.producer(config);
Low latency trade-off: Lower linger and smaller batches reduce latency but decrease throughput.
Direct Append
For single-record appends with minimal overhead:
// Bypass batching for critical low-latency writes
let input = AppendInput::new(vec![record]);
let ack = stream.append(input).await?;
Direct append() calls have higher per-record overhead. Use only when latency is critical and throughput is low.
Connection Management
Connection Pooling
The SDK automatically pools HTTP connections:
// From client.rs:43-44
const MAX_CONCURRENT_REQUESTS_PER_CLIENT: usize = 90;
const IDLE_TIMEOUT: Duration = Duration::from_secs(90);
- Up to 90 concurrent requests per client
- Connections idle for 90 seconds are reaped
- Automatic reconnection on connection failure
Reuse Clients
Share S2 instances across threads:
use std::sync::Arc;
let s2 = Arc::new(S2::new(S2Config::new(access_token))?);
// Clone Arc for each thread
let s2_clone = Arc::clone(&s2);
tokio::spawn(async move {
let stream = s2_clone.basin(basin).stream(stream_name);
// Use stream...
});
Reusing clients amortizes connection overhead and reduces resource usage.
Memory Management
Backpressure Control
Prevent unbounded memory growth with backpressure:
let config = AppendSessionConfig::new()
.with_max_unacked_bytes(5 * 1024 * 1024)?; // Cap at 5 MiB
let session = stream.append_session(config);
// This will block when 5 MiB of unacked data is in flight
for record in records {
session.submit(input).await?; // Backpressure applied here
}
Streaming Large Reads
Process read batches incrementally to avoid loading entire streams into memory:
use futures::StreamExt;
let input = ReadInput::new()
.with_start_seq_num(0)?
.with_limit_bytes(100 * 1024 * 1024)?; // Read 100 MiB total
let mut batches = stream.read_session(input).await?;
while let Some(batch) = batches.next().await {
let batch = batch?;
// Process batch immediately
process_batch(batch).await?;
// Batch is dropped, freeing memory before next batch
}
Use with_limit_records() or with_limit_bytes() to bound memory usage during reads.
Compression
Enable Compression
Reduce network bandwidth with compression:
use s2_sdk::types::{Compression, S2Config};
let config = S2Config::new(access_token)
.with_compression(Compression::Zstd); // or Compression::Gzip
let s2 = S2::new(config)?;
Compression options:
Compression::None - No compression (default)
Compression::Gzip - Good balance, widely supported
Compression::Zstd - Better compression ratio, faster
Zstd typically offers the best performance for S2 workloads.
When to Use Compression
Enable compression when:
- Network bandwidth is limited
- Records are highly compressible (text, JSON, logs)
- Network cost is a concern
Disable compression when:
- Records are already compressed (images, video)
- CPU is constrained
- Records are very small (< 100 bytes)
Batching Strategies
Adaptive Batching
Adjust batching based on load:
struct AdaptiveBatcher {
high_load_config: BatchingConfig,
low_load_config: BatchingConfig,
}
impl AdaptiveBatcher {
fn get_config(&self, queue_depth: usize) -> BatchingConfig {
if queue_depth > 1000 {
// High load: optimize for throughput
self.high_load_config.clone()
} else {
// Low load: optimize for latency
self.low_load_config.clone()
}
}
}
let batcher = AdaptiveBatcher {
high_load_config: BatchingConfig::new()
.with_linger(Duration::from_millis(50))
.with_max_batch_records(1000)
.unwrap(),
low_load_config: BatchingConfig::new()
.with_linger(Duration::from_millis(5))
.with_max_batch_records(100)
.unwrap(),
};
Size-Based Batching
For variable-sized records:
// Small records: optimize for count
let small_record_config = BatchingConfig::new()
.with_max_batch_records(1000)? // Hit count limit first
.with_max_batch_bytes(1024 * 1024)?;
// Large records: optimize for bytes
let large_record_config = BatchingConfig::new()
.with_max_batch_records(1000)?
.with_max_batch_bytes(512 * 1024)?; // Hit bytes limit first
Batch Size Tuning
Control read batch sizes for optimal memory/latency balance:
let input = ReadInput::new()
.with_start_seq_num(0)?
.with_limit_records(100)?; // Read 100 records at a time
let mut batches = stream.read_session(input).await?;
Parallel Reads
Read from multiple streams concurrently:
use futures::stream::{FuturesUnordered, StreamExt};
let streams = vec![stream1, stream2, stream3];
let reads: FuturesUnordered<_> = streams
.into_iter()
.map(|s| s.read_session(ReadInput::new()))
.collect();
let mut batch_streams = Vec::new();
while let Some(result) = reads.next().await {
batch_streams.push(result?);
}
// Process all streams concurrently
for mut batches in batch_streams {
tokio::spawn(async move {
while let Some(batch) = batches.next().await {
process_batch(batch?).await;
}
});
}
Following the Tail
Optimize for real-time consumption:
let input = ReadInput::new()
.with_start_at_tail()?
.with_wait_secs(60)?; // Long poll for new data
let mut batches = stream.read_session(input).await?;
while let Some(batch) = batches.next().await {
let batch = batch?;
if batch.tail.is_some() {
// We're caught up - lower CPU usage
tokio::time::sleep(Duration::from_millis(100)).await;
}
process_batch(batch).await?;
}
Retry Configuration
Aggressive Retries
For high-availability scenarios:
use s2_sdk::types::RetryConfig;
let retry_config = RetryConfig::default()
.with_max_retries(10) // More retry attempts
.with_min_base_delay(Duration::from_millis(50))
.with_max_base_delay(Duration::from_secs(2));
let config = S2Config::new(access_token)
.with_retry(retry_config);
Fast Failure
For latency-sensitive scenarios:
let retry_config = RetryConfig::default()
.with_max_retries(1) // Fail fast
.with_min_base_delay(Duration::from_millis(10))
.with_max_base_delay(Duration::from_millis(100));
let config = S2Config::new(access_token)
.with_retry(retry_config)
.with_request_timeout(Duration::from_millis(500)); // Short timeout
Monitoring and Metrics
Track Batch Sizes
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
struct Metrics {
total_records: Arc<AtomicU64>,
total_batches: Arc<AtomicU64>,
}
impl Metrics {
fn record_batch(&self, batch_size: usize) {
self.total_records.fetch_add(batch_size as u64, Ordering::Relaxed);
self.total_batches.fetch_add(1, Ordering::Relaxed);
}
fn avg_batch_size(&self) -> f64 {
let records = self.total_records.load(Ordering::Relaxed);
let batches = self.total_batches.load(Ordering::Relaxed);
if batches == 0 {
0.0
} else {
records as f64 / batches as f64
}
}
}
Monitor Backpressure
use tokio::time::Instant;
let start = Instant::now();
let permit = session.reserve(bytes).await?;
let wait_time = start.elapsed();
if wait_time > Duration::from_millis(100) {
eprintln!("High backpressure detected: waited {:?}", wait_time);
}
permit.submit(input);
Best Practices Summary
Profile first: Measure your actual bottlenecks before optimizing. Use tools like tokio-console or tracing.
Start with defaults: The default configurations work well for most use cases. Only tune when you have specific performance requirements.
Monitor memory: Track heap usage and set appropriate backpressure limits to prevent OOM.
Reuse sessions: Create sessions once and reuse them for multiple operations to amortize connection overhead.
Test at scale: Performance characteristics change at different scales. Test with production-like data volumes.
The S2 SDK uses tokio for async runtime. Ensure your application uses an appropriately sized thread pool for your workload.