Proper error handling is crucial for building reliable applications with S2. The SDK provides structured error types, automatic retry mechanisms, and clear patterns for handling different failure scenarios.
Error Types
S2Error
The top-level error type for SDK operations:
use s2_sdk::types::S2Error;
match operation.await {
Ok(result) => { /* success */ },
Err(S2Error::Api { code, message, status }) => {
eprintln!("API error [{}]: {}", code, message);
},
Err(S2Error::Client(msg)) => {
eprintln!("Client error: {}", msg);
},
Err(S2Error::Network(msg)) => {
eprintln!("Network error: {}", msg);
},
}
Error Categories
API Errors
Server-side errors with specific error codes:// Common API error codes
match error_code.as_str() {
"stream_not_found" => {
// Stream doesn't exist
},
"basin_not_found" => {
// Basin doesn't exist
},
"permission_denied" => {
// Insufficient permissions
},
"rate_limited" => {
// Too many requests
},
"quota_exhausted" => {
// Account quota exceeded
},
_ => {},
}
Client Errors
SDK-side errors from session management:// Session errors
- SessionClosed: Session already terminated
- SessionClosing: Session is in the process of closing
- SessionDropped: Session dropped without cleanup
Network Errors
Transport and connectivity issues:// Connection issues
- Connection timeout
- DNS resolution failure
- TLS handshake failure
Validation Errors
Input validation failures:use s2_sdk::types::ValidationError;
match AppendRecord::new(invalid_data) {
Ok(record) => { /* valid */ },
Err(ValidationError(msg)) => {
eprintln!("Validation failed: {}", msg);
},
}
Retryable vs Terminal Errors
Retryable Errors
These errors are automatically retried by sessions:
// From session/append.rs:50-59
impl AppendSessionError {
pub fn is_retryable(&self) -> bool {
match self {
Self::Api(err) => err.is_retryable(),
Self::AckTimeout => true,
Self::ServerDisconnected => true,
_ => false,
}
}
}
Retryable conditions:
- Acknowledgement timeout
- Server disconnection
- Heartbeat timeout (read sessions)
- Transient network errors
- Server unavailability (503)
- Gateway timeout (504)
Terminal Errors
These errors end the operation immediately:
- Invalid input (400)
- Permission denied (403)
- Resource not found (404)
- Conflict (409)
- Quota exhausted (403)
- Session closed
- Session dropped
Terminal errors indicate a problem that cannot be resolved by retrying. Handle them explicitly in your application logic.
Automatic Retry with Backoff
Sessions implement exponential backoff with jitter:
// Retry configuration
use s2_sdk::types::{RetryConfig, S2Config};
use std::time::Duration;
let retry_config = RetryConfig::default()
.with_min_base_delay(Duration::from_millis(100))
.with_max_base_delay(Duration::from_secs(1))
.with_max_retries(3);
let config = S2Config::new(access_token)
.with_retry(retry_config);
let s2 = S2::new(config)?;
Backoff Behavior
From retry.rs:79-97:
// Base delay doubles with each retry:
// Retry 1: 100ms + jitter (0-100ms)
// Retry 2: 200ms + jitter (0-200ms)
// Retry 3: 400ms + jitter (0-400ms)
// Retry 4: 800ms + jitter (0-800ms)
// Retry 5+: 1000ms + jitter (0-1000ms) [capped at max_base_delay]
Jitter is randomized (0-100% of base delay) to prevent thundering herd issues when multiple clients retry simultaneously.
Retry Reset
Backoff resets on successful operations:
// From session/append.rs:424-427
if prev_total_acked_records < state.total_acked_records {
prev_total_acked_records = state.total_acked_records;
retry_backoff.reset(); // Reset on progress
}
Error Handling Patterns
Pattern 1: Basic Error Handling
use s2_sdk::types::AppendRecord;
match producer.submit(AppendRecord::new("data")?).await {
Ok(ticket) => {
match ticket.await {
Ok(ack) => println!("Success: seq_num {}", ack.seq_num),
Err(e) => eprintln!("Append failed: {}", e),
}
}
Err(e) => eprintln!("Submit failed: {}", e),
}
Pattern 2: Graceful Degradation
use s2_sdk::types::S2Error;
async fn append_with_fallback(
producer: &Producer,
record: AppendRecord,
) -> Result<u64, S2Error> {
match producer.submit(record.clone()).await {
Ok(ticket) => {
match ticket.await {
Ok(ack) => Ok(ack.seq_num),
Err(e) => {
// Log to local file as fallback
log_locally(&record)?;
Err(e)
}
}
}
Err(e) => {
log_locally(&record)?;
Err(e)
}
}
}
Pattern 3: Circuit Breaker
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct CircuitBreaker {
failure_count: Arc<AtomicU64>,
threshold: u64,
}
impl CircuitBreaker {
fn should_attempt(&self) -> bool {
self.failure_count.load(Ordering::Relaxed) < self.threshold
}
fn record_success(&self) {
self.failure_count.store(0, Ordering::Relaxed);
}
fn record_failure(&self) {
self.failure_count.fetch_add(1, Ordering::Relaxed);
}
}
async fn append_with_circuit_breaker(
producer: &Producer,
record: AppendRecord,
breaker: &CircuitBreaker,
) -> Result<u64, S2Error> {
if !breaker.should_attempt() {
return Err(S2Error::Client("Circuit breaker open".to_string()));
}
match producer.submit(record).await {
Ok(ticket) => {
match ticket.await {
Ok(ack) => {
breaker.record_success();
Ok(ack.seq_num)
}
Err(e) => {
breaker.record_failure();
Err(e)
}
}
}
Err(e) => {
breaker.record_failure();
Err(e)
}
}
}
Pattern 4: Error Aggregation
use futures::future::join_all;
async fn append_batch_with_errors(
producer: &Producer,
records: Vec<AppendRecord>,
) -> (Vec<u64>, Vec<S2Error>) {
let futures: Vec<_> = records
.into_iter()
.map(|r| producer.submit(r))
.collect();
let results = join_all(futures).await;
let mut successes = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
Ok(ticket) => {
match ticket.await {
Ok(ack) => successes.push(ack.seq_num),
Err(e) => errors.push(e),
}
}
Err(e) => errors.push(e),
}
}
(successes, errors)
}
Append-Specific Errors
Append Condition Failed
When using fencing tokens or sequence number matching:
use s2_sdk::types::{AppendInput, FencingToken};
let input = AppendInput::new(records)
.with_match_seq_num(100); // Expect stream at seq 100
match session.submit(input).await {
Ok(ticket) => {
match ticket.await {
Ok(ack) => println!("Appended at {}", ack.start.seq_num),
Err(S2Error::Api { code, .. }) if code == "transaction_conflict" => {
// Sequence number mismatch or fencing token conflict
eprintln!("Condition failed - stream state changed");
},
Err(e) => eprintln!("Other error: {}", e),
}
}
Err(e) => eprintln!("Submit error: {}", e),
}
Read-Specific Errors
Heartbeat Timeout
use futures::StreamExt;
use std::time::Duration;
let mut batches = stream.read_session(input).await?;
while let Some(result) = batches.next().await {
match result {
Ok(batch) => {
// Process batch
}
Err(e) => {
eprintln!("Read error: {}", e);
// Session will auto-retry heartbeat timeouts
// Fatal errors will terminate the stream
}
}
}
Validation Errors
Catch validation errors early:
use s2_sdk::types::{BasinName, StreamName, ValidationError};
// Basin name validation
let basin = match "invalid_basin".parse::<BasinName>() {
Ok(name) => name,
Err(ValidationError(msg)) => {
eprintln!("Invalid basin name: {}", msg);
return;
}
};
// Record body validation
let record = match AppendRecord::new(data) {
Ok(r) => r,
Err(ValidationError(msg)) => {
eprintln!("Invalid record: {}", msg);
return;
}
};
// Batching config validation
let config = match BatchingConfig::new()
.with_max_batch_bytes(10) // Too small
{
Ok(c) => c,
Err(ValidationError(msg)) => {
eprintln!("Invalid config: {}", msg);
return;
}
};
Best Practices
Let sessions retry: Don’t implement your own retry logic for transient errors. Sessions handle this automatically.
Handle terminal errors explicitly: Check for specific error codes like permission_denied or quota_exhausted and handle them appropriately.
Log context with errors: Include relevant context (basin, stream, operation) when logging errors for easier debugging.
Use structured error handling: Match on error types and codes rather than string parsing.
Always call close() on sessions, even after errors, to ensure proper cleanup and resource release.
Monitor error rates: Track error types and frequencies to detect systemic issues early.