Overview
This guide covers best practices for integrating S2 into your applications, whether using the cloud service or self-hosting S2 Lite.
Connection Management
Reuse SDK Clients
Create SDK clients once and reuse them across your application. Clients maintain connection pools and internal state.
// Good: Create once, reuse everywhere
let s2 = S2::new(S2Config::new(access_token))?;
let basin = s2.basin(basin_name);
let stream = basin.stream(stream_name);
// Use the same client for multiple operations
stream.append(input1).await?;
stream.append(input2).await?;
// Bad: Creating new clients for each operation
for _ in 0..100 {
let s2 = S2::new(S2Config::new(access_token))?; // Don't do this!
s2.basin(basin_name).stream(stream_name).append(input).await?;
}
Set appropriate timeouts for your use case:
let config = S2Config::new(access_token)
.with_timeout(Duration::from_secs(30));
let s2 = S2::new(config)?;
For long-running streaming sessions, use higher timeouts or configure the timeout specifically for read operations.
Basin and Stream Configuration
Enable Auto-Creation
For development and testing, enable automatic stream creation:
s2 create-basin my-basin \
--create-stream-on-append \
--create-stream-on-read
In production, disable auto-creation and explicitly create streams. This prevents typos from creating unwanted streams.
Use Descriptive Names
Choose clear, descriptive names for basins and streams:
// Good: Clear purpose
let basin = s2.basin("user-events");
let stream = basin.stream("signup-events");
// Bad: Unclear purpose
let basin = s2.basin("data");
let stream = basin.stream("stream1");
Name Validation
Basin and stream names must:
- Be 1-63 characters long
- Contain only lowercase letters, numbers, and hyphens
- Start and end with a letter or number
- Not contain consecutive hyphens
Writing Data
Batch Records
Batch multiple records in a single append for better throughput:
// Good: Batch multiple records
let records = vec![
AppendRecord::new("event1")?,
AppendRecord::new("event2")?,
AppendRecord::new("event3")?,
];
let input = AppendInput::new(AppendRecordBatch::try_from_iter(records)?);
stream.append(input).await?;
// Bad: Individual appends
for event in events {
let input = AppendInput::new(AppendRecordBatch::try_from_iter([
AppendRecord::new(event)?
])?);
stream.append(input).await?; // Multiple round trips
}
S2 processes batches atomically - all records in a batch succeed or fail together.
Use the Producer API
For high-throughput scenarios, use the Producer API which handles batching and pipelining automatically:
use s2_sdk::producer::ProducerConfig;
let producer = stream.producer(ProducerConfig::new());
// Submit records and get tickets
let ticket1 = producer.submit(AppendRecord::new("event1")?).await?;
let ticket2 = producer.submit(AppendRecord::new("event2")?).await?;
// Wait for acknowledgments
let ack1 = ticket1.await?;
let ack2 = ticket2.await?;
// Always close the producer when done
producer.close().await?;
The Producer API automatically batches records and uses pipelining for optimal performance.
Handle Errors
Always handle append errors appropriately:
match stream.append(input).await {
Ok(output) => {
println!("Appended {} records", output.records.len());
}
Err(S2Error::Server(err)) => {
// Server error - may be retryable
eprintln!("Server error: {}", err.message);
}
Err(S2Error::Client(err)) => {
// Client error - likely not retryable
eprintln!("Client error: {}", err);
}
Err(err) => {
eprintln!("Unexpected error: {}", err);
}
}
Reading Data
Use Streaming Sessions
For continuous reading, use streaming sessions instead of polling:
use futures::StreamExt;
let input = ReadInput::new();
let mut batches = stream.read_session(input).await?;
while let Some(result) = batches.next().await {
let batch = result?;
for record in batch.records {
// Process record
println!("seq_num: {}, body: {:?}", record.seq_num, record.body);
}
}
Set Appropriate Read Limits
Control how much data you read:
use s2_sdk::types::{ReadLimits, ReadStop};
let input = ReadInput::new()
.with_stop(ReadStop::new()
.with_limits(ReadLimits::new()
.with_count(100) // Max 100 records
.with_bytes(1024 * 1024) // Max 1MB
)
);
let output = stream.read(input).await?;
Use Checkpoints
Store the last processed sequence number to resume reading:
// Read from a specific sequence number
let input = ReadInput::new()
.with_start_seq_num(last_processed_seq_num + 1);
let output = stream.read(input).await?;
// Process records and update checkpoint
for record in output.records {
process_record(&record)?;
last_processed_seq_num = record.seq_num;
save_checkpoint(last_processed_seq_num)?;
}
S2 Lite Specific
Storage Backend Selection
In-Memory (Development/Testing)
# No persistence - data lost on restart
s2 lite --port 8080
Perfect for integration tests and local development.
Local Disk (Single-Node Development)
# Persists to local disk
s2 lite --port 8080 --local-root /var/lib/s2lite
Local disk mode is not suitable for production as it lacks durability guarantees.
Object Storage (Production)
# AWS S3
s2 lite --bucket my-s3-bucket --path s2lite
# Tigris or other S3-compatible
AWS_ACCESS_KEY_ID=xxx \
AWS_SECRET_ACCESS_KEY=yyy \
AWS_ENDPOINT_URL_S3=https://fly.storage.tigris.dev \
s2 lite --bucket my-bucket --path s2lite
Always use object storage for production deployments to ensure data durability.
Resource Limits
When deploying S2 Lite, set appropriate resource limits:
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
Environment Variables
SlateDB Configuration
# Flush interval (lower = better latency, more API calls)
SL8_FLUSH_INTERVAL=10ms
For production with object storage, use 50ms (default). For in-memory testing, use 5ms (default).
# Enable pipelining (experimental)
S2LITE_PIPELINE=true
Pipelining is currently disabled by default. Only enable in non-critical environments.
Logging
# Production: info or warn level
RUST_LOG=info
# Development: debug level
RUST_LOG=debug
# Specific modules
RUST_LOG=s2_lite=debug,slatedb=info
Deployment Patterns
Single Instance
Simplest deployment for small workloads:
docker run -p 8080:80 \
-e AWS_PROFILE=${AWS_PROFILE} \
-v ~/.aws:/home/nonroot/.aws:ro \
ghcr.io/s2-streamstore/s2 lite \
--bucket my-bucket \
--path s2lite
S2 Lite is a single-node binary. For high availability, use the S2 cloud service.
Kubernetes with Helm
helm install s2-lite s2/s2-lite-helm \
--set objectStorage.enabled=true \
--set objectStorage.bucket=my-bucket \
--set metrics.serviceMonitor.enabled=true
Security
Access Tokens
Cloud Service:
# Never commit tokens to version control
export S2_ACCESS_TOKEN=$(cat ~/.s2/token)
S2 Lite:
# Token value is ignored but required
export S2_ACCESS_TOKEN="any-value"
S2 Lite does not validate access tokens. Use network-level security (VPC, firewall rules) to protect your S2 Lite instances.
TLS/HTTPS
For S2 Lite in production, enable TLS:
helm install s2-lite s2/s2-lite-helm \
--set tls.enabled=true \
--set tls.cert=/etc/tls/tls.crt \
--set tls.key=/etc/tls/tls.key
Network Isolation
Deploy S2 Lite in a private network:
# Kubernetes: ClusterIP service (internal only)
service:
type: ClusterIP
port: 80
SDK Compatibility
Ensure you’re using compatible SDK versions:
| SDK | Minimum Version | S2 Cloud | S2 Lite |
|---|
| CLI | v0.26+ | ✅ | ✅ |
| TypeScript | v0.22+ | ✅ | ✅ |
| Go | v0.11+ | ✅ | ✅ |
| Rust | v0.22+ | ✅ | ✅ |
| Python | Migration needed | ✅ | 🚧 |
| Java | Migration needed | ✅ | 🚧 |
S2 Lite requires the S2-Basin header for stream operations. SDKs handle this automatically.
Benchmark Your Workload
Use the built-in benchmark tool:
# Test write performance
s2 bench my-basin/my-stream \
--target-mibps 10 \
--duration 30s \
--catchup-delay 0s
Optimize Batch Sizes
Find the optimal batch size for your workload:
- Too small: Higher overhead from API calls
- Too large: Higher latency, more memory usage
- Recommended: 10-100 records or 10KB-1MB per batch
Monitor Metrics
Track these key metrics:
- Append latency (P95, P99)
- Batch sizes (records and bytes)
- Error rates
- Throughput (records/sec, bytes/sec)
Error Handling
Retry Logic
Implement exponential backoff for transient errors:
use tokio::time::{sleep, Duration};
let mut retries = 0;
let max_retries = 3;
loop {
match stream.append(input.clone()).await {
Ok(output) => break,
Err(err) if retries < max_retries => {
retries += 1;
let delay = Duration::from_millis(100 * 2u64.pow(retries));
eprintln!("Retry {} after {:?}: {}", retries, delay, err);
sleep(delay).await;
}
Err(err) => return Err(err.into()),
}
}
Circuit Breaker Pattern
For high-availability systems, implement circuit breakers to prevent cascading failures.
Testing
Use In-Memory S2 Lite
Always test against in-memory S2 Lite:
# In CI/CD
docker run -d -p 8080:80 ghcr.io/s2-streamstore/s2 lite
Separate Test Basins
Use separate basins for different environments:
let basin = match env::var("ENVIRONMENT") {
Ok(env) if env == "production" => "prod-events",
Ok(env) if env == "staging" => "staging-events",
_ => "dev-events",
};
Clean Up Test Data
#[cfg(test)]
mod tests {
use super::*;
async fn cleanup_basin(s2: &S2, basin_name: BasinName) {
let _ = s2.delete_basin(basin_name).await;
}
#[tokio::test]
async fn test_with_cleanup() {
let s2 = S2::new(S2Config::new("test-token")).unwrap();
let basin_name = "test-basin".parse().unwrap();
// Test code...
cleanup_basin(&s2, basin_name).await;
}
}
Migration and Compatibility
Migrating from Development to Production
- Switch from in-memory to object storage
- Enable TLS/HTTPS
- Disable auto-creation of streams
- Configure monitoring and alerting
- Set up automated backups (object storage handles this)
- Review resource limits
Cloud to Self-Hosted (or vice versa)
Both use the same API, so only configuration changes are needed:
// Cloud
let config = S2Config::new(access_token);
// Self-hosted S2 Lite
let config = S2Config::new("any-token")
.with_account_endpoint("http://localhost:8080")
.with_basin_endpoint("http://localhost:8080");