Skip to main content
Proper error handling is crucial for building reliable applications with S2. The SDK provides structured error types, automatic retry mechanisms, and clear patterns for handling different failure scenarios.

Error Types

S2Error

The top-level error type for SDK operations:
use s2_sdk::types::S2Error;

match operation.await {
    Ok(result) => { /* success */ },
    Err(S2Error::Api { code, message, status }) => {
        eprintln!("API error [{}]: {}", code, message);
    },
    Err(S2Error::Client(msg)) => {
        eprintln!("Client error: {}", msg);
    },
    Err(S2Error::Network(msg)) => {
        eprintln!("Network error: {}", msg);
    },
}

Error Categories

1

API Errors

Server-side errors with specific error codes:
// Common API error codes
match error_code.as_str() {
    "stream_not_found" => {
        // Stream doesn't exist
    },
    "basin_not_found" => {
        // Basin doesn't exist
    },
    "permission_denied" => {
        // Insufficient permissions
    },
    "rate_limited" => {
        // Too many requests
    },
    "quota_exhausted" => {
        // Account quota exceeded
    },
    _ => {},
}
2

Client Errors

SDK-side errors from session management:
// Session errors
- SessionClosed: Session already terminated
- SessionClosing: Session is in the process of closing
- SessionDropped: Session dropped without cleanup
3

Network Errors

Transport and connectivity issues:
// Connection issues
- Connection timeout
- DNS resolution failure
- TLS handshake failure
4

Validation Errors

Input validation failures:
use s2_sdk::types::ValidationError;

match AppendRecord::new(invalid_data) {
    Ok(record) => { /* valid */ },
    Err(ValidationError(msg)) => {
        eprintln!("Validation failed: {}", msg);
    },
}

Retryable vs Terminal Errors

Retryable Errors

These errors are automatically retried by sessions:
// From session/append.rs:50-59
impl AppendSessionError {
    pub fn is_retryable(&self) -> bool {
        match self {
            Self::Api(err) => err.is_retryable(),
            Self::AckTimeout => true,
            Self::ServerDisconnected => true,
            _ => false,
        }
    }
}
Retryable conditions:
  • Acknowledgement timeout
  • Server disconnection
  • Heartbeat timeout (read sessions)
  • Transient network errors
  • Server unavailability (503)
  • Gateway timeout (504)

Terminal Errors

These errors end the operation immediately:
  • Invalid input (400)
  • Permission denied (403)
  • Resource not found (404)
  • Conflict (409)
  • Quota exhausted (403)
  • Session closed
  • Session dropped
Terminal errors indicate a problem that cannot be resolved by retrying. Handle them explicitly in your application logic.

Automatic Retry with Backoff

Sessions implement exponential backoff with jitter:
// Retry configuration
use s2_sdk::types::{RetryConfig, S2Config};
use std::time::Duration;

let retry_config = RetryConfig::default()
    .with_min_base_delay(Duration::from_millis(100))
    .with_max_base_delay(Duration::from_secs(1))
    .with_max_retries(3);

let config = S2Config::new(access_token)
    .with_retry(retry_config);

let s2 = S2::new(config)?;

Backoff Behavior

From retry.rs:79-97:
// Base delay doubles with each retry:
// Retry 1: 100ms + jitter (0-100ms)
// Retry 2: 200ms + jitter (0-200ms)
// Retry 3: 400ms + jitter (0-400ms)
// Retry 4: 800ms + jitter (0-800ms)
// Retry 5+: 1000ms + jitter (0-1000ms) [capped at max_base_delay]
Jitter is randomized (0-100% of base delay) to prevent thundering herd issues when multiple clients retry simultaneously.

Retry Reset

Backoff resets on successful operations:
// From session/append.rs:424-427
if prev_total_acked_records < state.total_acked_records {
    prev_total_acked_records = state.total_acked_records;
    retry_backoff.reset(); // Reset on progress
}

Error Handling Patterns

Pattern 1: Basic Error Handling

use s2_sdk::types::AppendRecord;

match producer.submit(AppendRecord::new("data")?).await {
    Ok(ticket) => {
        match ticket.await {
            Ok(ack) => println!("Success: seq_num {}", ack.seq_num),
            Err(e) => eprintln!("Append failed: {}", e),
        }
    }
    Err(e) => eprintln!("Submit failed: {}", e),
}

Pattern 2: Graceful Degradation

use s2_sdk::types::S2Error;

async fn append_with_fallback(
    producer: &Producer,
    record: AppendRecord,
) -> Result<u64, S2Error> {
    match producer.submit(record.clone()).await {
        Ok(ticket) => {
            match ticket.await {
                Ok(ack) => Ok(ack.seq_num),
                Err(e) => {
                    // Log to local file as fallback
                    log_locally(&record)?;
                    Err(e)
                }
            }
        }
        Err(e) => {
            log_locally(&record)?;
            Err(e)
        }
    }
}

Pattern 3: Circuit Breaker

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct CircuitBreaker {
    failure_count: Arc<AtomicU64>,
    threshold: u64,
}

impl CircuitBreaker {
    fn should_attempt(&self) -> bool {
        self.failure_count.load(Ordering::Relaxed) < self.threshold
    }

    fn record_success(&self) {
        self.failure_count.store(0, Ordering::Relaxed);
    }

    fn record_failure(&self) {
        self.failure_count.fetch_add(1, Ordering::Relaxed);
    }
}

async fn append_with_circuit_breaker(
    producer: &Producer,
    record: AppendRecord,
    breaker: &CircuitBreaker,
) -> Result<u64, S2Error> {
    if !breaker.should_attempt() {
        return Err(S2Error::Client("Circuit breaker open".to_string()));
    }

    match producer.submit(record).await {
        Ok(ticket) => {
            match ticket.await {
                Ok(ack) => {
                    breaker.record_success();
                    Ok(ack.seq_num)
                }
                Err(e) => {
                    breaker.record_failure();
                    Err(e)
                }
            }
        }
        Err(e) => {
            breaker.record_failure();
            Err(e)
        }
    }
}

Pattern 4: Error Aggregation

use futures::future::join_all;

async fn append_batch_with_errors(
    producer: &Producer,
    records: Vec<AppendRecord>,
) -> (Vec<u64>, Vec<S2Error>) {
    let futures: Vec<_> = records
        .into_iter()
        .map(|r| producer.submit(r))
        .collect();

    let results = join_all(futures).await;

    let mut successes = Vec::new();
    let mut errors = Vec::new();

    for result in results {
        match result {
            Ok(ticket) => {
                match ticket.await {
                    Ok(ack) => successes.push(ack.seq_num),
                    Err(e) => errors.push(e),
                }
            }
            Err(e) => errors.push(e),
        }
    }

    (successes, errors)
}

Append-Specific Errors

Append Condition Failed

When using fencing tokens or sequence number matching:
use s2_sdk::types::{AppendInput, FencingToken};

let input = AppendInput::new(records)
    .with_match_seq_num(100); // Expect stream at seq 100

match session.submit(input).await {
    Ok(ticket) => {
        match ticket.await {
            Ok(ack) => println!("Appended at {}", ack.start.seq_num),
            Err(S2Error::Api { code, .. }) if code == "transaction_conflict" => {
                // Sequence number mismatch or fencing token conflict
                eprintln!("Condition failed - stream state changed");
            },
            Err(e) => eprintln!("Other error: {}", e),
        }
    }
    Err(e) => eprintln!("Submit error: {}", e),
}

Read-Specific Errors

Heartbeat Timeout

use futures::StreamExt;
use std::time::Duration;

let mut batches = stream.read_session(input).await?;

while let Some(result) = batches.next().await {
    match result {
        Ok(batch) => {
            // Process batch
        }
        Err(e) => {
            eprintln!("Read error: {}", e);
            // Session will auto-retry heartbeat timeouts
            // Fatal errors will terminate the stream
        }
    }
}

Validation Errors

Catch validation errors early:
use s2_sdk::types::{BasinName, StreamName, ValidationError};

// Basin name validation
let basin = match "invalid_basin".parse::<BasinName>() {
    Ok(name) => name,
    Err(ValidationError(msg)) => {
        eprintln!("Invalid basin name: {}", msg);
        return;
    }
};

// Record body validation
let record = match AppendRecord::new(data) {
    Ok(r) => r,
    Err(ValidationError(msg)) => {
        eprintln!("Invalid record: {}", msg);
        return;
    }
};

// Batching config validation
let config = match BatchingConfig::new()
    .with_max_batch_bytes(10) // Too small
{
    Ok(c) => c,
    Err(ValidationError(msg)) => {
        eprintln!("Invalid config: {}", msg);
        return;
    }
};

Best Practices

Let sessions retry: Don’t implement your own retry logic for transient errors. Sessions handle this automatically.
Handle terminal errors explicitly: Check for specific error codes like permission_denied or quota_exhausted and handle them appropriately.
Log context with errors: Include relevant context (basin, stream, operation) when logging errors for easier debugging.
Use structured error handling: Match on error types and codes rather than string parsing.
Always call close() on sessions, even after errors, to ensure proper cleanup and resource release.
Monitor error rates: Track error types and frequencies to detect systemic issues early.

Build docs developers (and LLMs) love