Skip to main content

Overview

S2S (S2 Streaming) is a binary framing protocol optimized for streaming records to and from S2. It provides:
  • Bi-directional streaming - Continuous reads and writes in both directions
  • Compression - Built-in support for gzip and zstd compression
  • Efficiency - Protocol buffer encoding with minimal overhead
  • Error handling - Graceful termination with error details

When to Use S2S

Choose S2S over REST or SSE when:
  • You need bi-directional streaming (streaming appends)
  • You want lower latency than HTTP request/response
  • You need higher throughput with compression
  • You’re building real-time data pipelines
For simple use cases, consider:
  • REST API - Single reads or writes, simpler integration
  • Server-Sent Events (SSE) - Read-only streaming with standard tools

Protocol Specification

Frame Structure

S2S uses length-prefixed frames with the following structure:
┌─────────────┬────────────┬─────────────────────────────┐
│   LENGTH    │   FLAGS    │        PAYLOAD DATA         │
│  (3 bytes)  │  (1 byte)  │     (variable length)       │
└─────────────┴────────────┴─────────────────────────────┘
Length (3 bytes)
  • Big-endian unsigned integer
  • Specifies size of FLAGS + PAYLOAD
  • Does NOT include the 3-byte length prefix itself
  • Maximum: 2 MiB (enforced limit)
Flags (1 byte)
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ 7 │ 6 │ 5 │ 4 │ 3 │ 2 │ 1 │ 0 │  Bit positions
├───┼───┴───┼───┴───┴───┴───┴───┤
│ T │  C C  │   Reserved (0s)   │  Purpose
└───┴───────┴───────────────────┘

T = Terminal flag (1 bit)
C = Compression (2 bits, values 0-3)
Payload
  • Protocol buffer message (regular frames)
  • Status code + JSON body (terminal frames)

Regular Frame

Carries a Protocol Buffer message:
┌─────────────┬────────────┬─────────────────────────────┐
│   LENGTH    │   FLAGS    │  Compressed Proto Message  │
│  (3 bytes)  │  (1 byte)  │     (variable length)       │
├─────────────┼────────────┼─────────────────────────────┤
│ 0x00 00 XX  │ 0 CA XXXXX │  AppendInput or ReadBatch   │
└─────────────┴────────────┴─────────────────────────────┘
  • Terminal bit = 0
  • Compression bits indicate algorithm
  • Payload is a Protocol Buffer message

Terminal Frame

Signals end of stream with error details:
┌─────────────┬────────────┬─────────────┬───────────────┐
│   LENGTH    │   FLAGS    │ STATUS CODE │   JSON BODY   │
│  (3 bytes)  │  (1 byte)  │  (2 bytes)  │  (variable)   │
├─────────────┼────────────┼─────────────┼───────────────┤
│ 0x00 00 XX  │ 1 CA XXXXX │   HTTP Code │   {"error"..} │
└─────────────┴────────────┴─────────────┴───────────────┘
  • Terminal bit = 1
  • Status code is HTTP status (2 bytes, big-endian)
  • Body is JSON-encoded error message
After receiving or sending a terminal frame, the session must be closed. No further frames should be sent or expected.

Compression

S2S supports three compression algorithms:
ValueAlgorithmDescription
0NoneNo compression (small messages)
1ZstdZstandard compression (preferred)
2GzipGzip compression (widely supported)
3ReservedFuture use

Compression Selection

The server selects compression based on the Accept-Encoding header:
Accept-Encoding: zstd, gzip
  • Zstd is preferred for best compression and speed
  • Gzip is used as fallback if zstd is not supported
  • None is used if no encoding is specified

Compression Threshold

Messages smaller than 1 KiB are not compressed (compression flag = 0) even if an algorithm is selected. This avoids compression overhead for small payloads.

Message Types

Read Session Messages

Server sends ReadBatch messages:
message ReadBatch {
  repeated SequencedRecord records = 1;
  optional StreamPosition tail = 2;
}

message SequencedRecord {
  uint64 seq_num = 1;
  uint64 timestamp = 2;
  repeated Header headers = 3;
  bytes body = 4;
}
Example flow:
  1. Client opens S2S read session
  2. Server sends frames containing ReadBatch messages
  3. Each batch contains one or more records
  4. Server sends heartbeats (empty batches) every 5-15 seconds
  5. Session continues until error or client closes

Append Session Messages

Client sends AppendInput, server responds with AppendAck:
message AppendInput {
  repeated AppendRecord records = 1;
  optional uint64 match_seq_num = 2;
  optional string fencing_token = 3;
}

message AppendAck {
  optional StreamPosition start = 1;
  optional StreamPosition end = 2;
  optional StreamPosition tail = 3;
}
Example flow:
  1. Client opens S2S append session
  2. Client sends frames containing AppendInput messages
  3. Server responds with frames containing AppendAck messages
  4. Process continues for streaming writes
  5. Either side can send terminal frame to end session

Initiating a Session

Read Session

Open a streaming read:
GET /v1/basins/{basin}/streams/{stream}/read?wait=300
Authorization: Bearer YOUR_TOKEN
Accept: s2s/proto
Accept-Encoding: zstd, gzip
Response:
HTTP/1.1 200 OK
Content-Type: s2s/proto
Content-Encoding: zstd

[S2S frames containing ReadBatch messages...]

Append Session

Open a streaming append:
POST /v1/basins/{basin}/streams/{stream}/append
Authorization: Bearer YOUR_TOKEN  
Content-Type: s2s/proto
Accept: s2s/proto
Accept-Encoding: zstd

[S2S frames containing AppendInput messages...]
Response:
HTTP/1.1 200 OK
Content-Type: s2s/proto
Content-Encoding: zstd

[S2S frames containing AppendAck messages...]

Error Handling

Terminal Messages

When an error occurs, the server sends a terminal frame: Example terminal frame payload:
Flags: 0x80 (terminal bit set)
Status: 0x01 0x90 (400 Bad Request)
Body: {"error":"validation_error","message":"Invalid record format"}

Common Error Codes

StatusMeaningHandling
400Bad RequestFix request parameters
401UnauthorizedCheck access token
403ForbiddenInsufficient permissions
404Not FoundStream does not exist
409ConflictCondition failed (seq_num mismatch, etc)
416Range Not SatisfiableRead position beyond tail
500Internal ErrorRetry with backoff

Retry Strategy

For transient errors (5xx), implement exponential backoff:
max_retries = 5
base_delay = 1.0  # seconds

for attempt in range(max_retries):
    try:
        session = open_s2s_session()
        # process frames...
        break
    except ServerError:
        if attempt < max_retries - 1:
            delay = base_delay * (2 ** attempt)
            time.sleep(delay)
        else:
            raise

Frame Encoding Example

Encoding a ReadBatch with 2 records:
import struct
import zstandard as zstd

# Create protobuf message
read_batch = ReadBatch()
read_batch.records.append(SequencedRecord(
    seq_num=100,
    timestamp=1234567890,
    body=b"Hello, S2!"
))

# Serialize to bytes
proto_bytes = read_batch.SerializeToString()

# Compress if > 1 KiB
if len(proto_bytes) >= 1024:
    compressor = zstd.ZstdCompressor()
    payload = compressor.compress(proto_bytes)
    compression = 1  # Zstd
else:
    payload = proto_bytes
    compression = 0  # None

# Build flags byte
flags = (compression << 5) & 0x60  # Compression bits

# Calculate length
length = 1 + len(payload)  # flags + payload

# Encode frame
frame = struct.pack('>I', length)[1:]  # 3-byte length
frame += struct.pack('B', flags)       # 1-byte flags  
frame += payload                        # Variable payload

# Send frame over socket
socket.send(frame)

Frame Decoding Example

import struct
import zstandard as zstd

# Read 3-byte length prefix
length_bytes = socket.recv(3)
length = struct.unpack('>I', b'\x00' + length_bytes)[0]

# Read flags + payload
frame_data = socket.recv(length)
flags = frame_data[0]
payload = frame_data[1:]

# Check if terminal
is_terminal = (flags & 0x80) != 0

if is_terminal:
    # Parse terminal frame
    status = struct.unpack('>H', payload[:2])[0]
    body = payload[2:].decode('utf-8')
    error = json.loads(body)
    raise SessionError(status, error)

# Extract compression
compression = (flags & 0x60) >> 5

# Decompress if needed
if compression == 1:  # Zstd
    decompressor = zstd.ZstdDecompressor()
    proto_bytes = decompressor.decompress(payload)
elif compression == 2:  # Gzip
    proto_bytes = gzip.decompress(payload)
else:  # None
    proto_bytes = payload

# Parse protobuf
read_batch = ReadBatch()
read_batch.ParseFromString(proto_bytes)

Protocol Buffer Definitions

S2S uses Protocol Buffers v3. Full definitions:

StreamPosition

message StreamPosition {
  uint64 seq_num = 1;
  uint64 timestamp = 2;
}
message Header {
  bytes name = 1;
  bytes value = 2;
}

AppendRecord

message AppendRecord {
  optional uint64 timestamp = 1;
  repeated Header headers = 2;
  bytes body = 3;
}

SequencedRecord

message SequencedRecord {
  uint64 seq_num = 1;
  uint64 timestamp = 2;
  repeated Header headers = 3;
  bytes body = 4;
}
See the full protocol buffer definitions in the S2 repository.

SDK Support

S2 SDKs provide automatic S2S protocol handling:

Rust SDK

use s2::{Client, types::StreamName};

let client = Client::from_env()?;
let basin = client.basin("my-basin");

// Streaming read with S2S
let mut stream = basin
    .stream(StreamName::new("my-stream")?)
    .read()
    .start_seq_num(0)
    .into_stream()
    .await?;

while let Some(batch) = stream.next().await {
    for record in batch?.records {
        println!("seq={} data={:?}", record.seq_num, record.body);
    }
}
// Streaming append with S2S
let mut appender = basin
    .stream(StreamName::new("my-stream")?)
    .append_session()
    .await?;

for i in 0..1000 {
    let record = AppendRecordBuilder::new()
        .body(format!("record {}", i))
        .build()?;
    
    let ack = appender.append(vec![record]).await?;
    println!("Appended at seq_num={}", ack.start.seq_num);
}
The SDK handles frame encoding/decoding, compression negotiation, and error handling automatically.

Performance Characteristics

Compression Ratios

Typical compression ratios for log data:
  • Zstd: 5-10x compression
  • Gzip: 4-8x compression
  • None: 1x (no compression)

Latency

  • Frame overhead: ~4 bytes per message (length + flags)
  • Network RTT: Depends on client location
  • Compression: 1-5ms for typical messages

Throughput

S2S can sustain:
  • Reads: 100+ MB/s per session with compression
  • Writes: 50+ MB/s per session with compression
Actual performance depends on record size, compression ratio, and network conditions.

Best Practices

1. Enable Compression

Always request compression for production workloads:
Accept-Encoding: zstd, gzip

2. Batch Records

Batch multiple records per frame to reduce overhead:
# Good: Batch 100 records
append_input = AppendInput(records=records_batch)

# Less efficient: 100 separate frames
for record in records:
    append_input = AppendInput(records=[record])

3. Handle Terminal Frames

Always check for terminal frames and handle gracefully:
if is_terminal:
    logger.error(f"Session ended: {status} {error_body}")
    # Clean up resources
    socket.close()
    return

4. Implement Heartbeat Detection

For read sessions, detect heartbeats (empty batches) to confirm liveness:
if len(read_batch.records) == 0:
    logger.debug("Received heartbeat")
    last_heartbeat = time.time()

5. Use Connection Pooling

Reuse connections when possible, but open new sessions for each streaming operation.

Limitations

  • Maximum frame size: 2 MiB (enforced)
  • Maximum batch size: 1000 records or 1 MiB of metered data
  • Heartbeat interval: 5-15 seconds (server-controlled)

Next Steps

API Overview

Learn about S2 API structure and endpoints

SDK Reference

Use the Rust SDK for automatic S2S handling

Build docs developers (and LLMs) love