Skip to main content
Batching records is essential for achieving high throughput in S2. The SDK provides automatic batching with configurable parameters for linger time, batch size, and byte limits.

Why Batching Matters

Batching reduces overhead by:
  • Minimizing network round trips
  • Reducing per-request costs
  • Improving compression efficiency
  • Enabling higher throughput
S2 enforces maximum batch limits: 1000 records or 1 MiB metered bytes per batch.

BatchingConfig

The BatchingConfig struct controls how records are batched:
use s2_sdk::batching::BatchingConfig;
use std::time::Duration;

let config = BatchingConfig::new()
    .with_linger(Duration::from_millis(10))
    .with_max_batch_records(500)?
    .with_max_batch_bytes(512 * 1024)?; // 512 KiB

Configuration Parameters

1

Linger Duration

How long to wait for more records before flushing a batch.
let config = BatchingConfig::new()
    .with_linger(Duration::from_millis(5)); // Default
Lower linger = lower latency, higher linger = better batching efficiency.
2

Max Batch Records

Maximum number of records per batch (1-1000).
let config = BatchingConfig::new()
    .with_max_batch_records(1000)?; // Default: 1000
Must be at least 1 and cannot exceed 1000.
3

Max Batch Bytes

Maximum metered bytes per batch (8 bytes - 1 MiB).
let config = BatchingConfig::new()
    .with_max_batch_bytes(1024 * 1024)?; // Default: 1 MiB
Must be at least 8 bytes and cannot exceed 1 MiB (1048576 bytes).

Automatic Batching with Producer

The Producer provides the simplest way to leverage batching:
use s2_sdk::producer::ProducerConfig;
use s2_sdk::types::AppendRecord;

let producer_config = ProducerConfig::new()
    .with_batching(
        BatchingConfig::new()
            .with_linger(Duration::from_millis(10))
            .with_max_batch_records(500)?
    );

let producer = stream.producer(producer_config);

// Submit individual records - batching happens automatically
let ticket1 = producer.submit(AppendRecord::new("data1")?).await?;
let ticket2 = producer.submit(AppendRecord::new("data2")?).await?;
let ticket3 = producer.submit(AppendRecord::new("data3")?).await?;

// All three may be batched together if submitted within linger window
let ack1 = ticket1.await?;
println!("Record seq_num: {}", ack1.seq_num);
The producer handles batching, backpressure, and retries automatically.

Manual Batching

For more control, use AppendInputs to batch a stream of records:
use s2_sdk::batching::AppendInputs;
use futures::StreamExt;
use tokio_stream::wrappers::ReceiverStream;

// Create a channel for records
let (tx, rx) = tokio::sync::mpsc::channel(100);

// Create batching stream
let config = BatchingConfig::new()
    .with_linger(Duration::from_millis(5));

let mut inputs = AppendInputs::new(
    ReceiverStream::new(rx),
    config,
);

// Send records
tokio::spawn(async move {
    for i in 0..100 {
        tx.send(AppendRecord::new(format!("record{}", i)).unwrap())
            .await
            .unwrap();
    }
});

// Consume batched inputs
while let Some(result) = inputs.next().await {
    let input = result?;
    println!("Batch of {} records", input.records.len());
    // Submit to append session
}

Batching Behavior

Flush Triggers

A batch is flushed when:
  1. Record count limit reached: Batch contains max_batch_records
  2. Byte limit reached: Batch size reaches max_batch_bytes
  3. Linger timeout: linger duration expires since first record
  4. Input stream ends: No more records available

Overflow Handling

When a record would overflow the current batch:
// From batching.rs:189-196
if would_overflow_batch(&config, batch.len(), batch.metered_bytes(), &record) {
    // Record is stashed for next batch
    overflowed_record = Some(record);
} else {
    // Record fits in current batch
    batch.push(record);
}
Records are never split across batches. If a record would overflow, it becomes the first record of the next batch.

Oversized Records

Records larger than max_batch_bytes are rejected:
let record = AppendRecord::new(vec![0u8; 2 * 1024 * 1024])?; // 2 MiB

let config = BatchingConfig::new()
    .with_max_batch_bytes(1024 * 1024)?; // 1 MiB

let mut batches = AppendRecordBatches::new(
    futures::stream::iter(vec![record]),
    config,
);

match batches.next().await {
    Some(Err(e)) => {
        // Error: "record size in metered bytes (2097152) 
        //         exceeds max_batch_bytes (1048576)"
    }
    _ => {}
}

Fencing and Sequence Numbers

Batching can be combined with fencing tokens and sequence number matching:
use s2_sdk::types::FencingToken;

let fencing_token = FencingToken::generate();

let inputs = AppendInputs::new(record_stream, config)
    .with_fencing_token(fencing_token)
    .with_match_seq_num(100); // Start from seq_num 100

// Sequence numbers auto-increment across batches
while let Some(result) = inputs.next().await {
    let input = result?;
    // First batch: match_seq_num = 100
    // Second batch: match_seq_num = 100 + first_batch_len
    // And so on...
}
The match_seq_num is automatically incremented by the batch size, ensuring continuity across batches.

Performance Tuning

Latency vs Throughput

Low latency (minimize delay):
let config = BatchingConfig::new()
    .with_linger(Duration::from_millis(1))
    .with_max_batch_records(100)?;
High throughput (maximize batching):
let config = BatchingConfig::new()
    .with_linger(Duration::from_millis(50))
    .with_max_batch_records(1000)?;

Record Size Considerations

Small records (< 1 KiB):
// Optimize for count
let config = BatchingConfig::new()
    .with_max_batch_records(1000)?
    .with_max_batch_bytes(1024 * 1024)?;
Large records (> 100 KiB):
// Bytes limit will be hit first
let config = BatchingConfig::new()
    .with_max_batch_records(1000)?
    .with_max_batch_bytes(1024 * 1024)?; // May only fit ~10 records

Best Practices

Start with defaults: The default configuration (5ms linger, 1000 records, 1 MiB) works well for most use cases.
Measure metered bytes: Use record.metered_bytes() to understand your record sizes and tune max_batch_bytes accordingly.
Use Producer for simplicity: Unless you need fine-grained control, the Producer API handles batching optimally.
Batching adds a small amount of latency (up to linger duration) but dramatically improves throughput for high-volume workloads.

Build docs developers (and LLMs) love