Overview
S2S (S2 Streaming) is a binary framing protocol optimized for streaming records to and from S2. It provides:
Bi-directional streaming - Continuous reads and writes in both directions
Compression - Built-in support for gzip and zstd compression
Efficiency - Protocol buffer encoding with minimal overhead
Error handling - Graceful termination with error details
When to Use S2S
Choose S2S over REST or SSE when:
You need bi-directional streaming (streaming appends)
You want lower latency than HTTP request/response
You need higher throughput with compression
You’re building real-time data pipelines
For simple use cases, consider:
REST API - Single reads or writes, simpler integration
Server-Sent Events (SSE) - Read-only streaming with standard tools
Protocol Specification
Frame Structure
S2S uses length-prefixed frames with the following structure:
┌─────────────┬────────────┬─────────────────────────────┐
│ LENGTH │ FLAGS │ PAYLOAD DATA │
│ (3 bytes) │ (1 byte) │ (variable length) │
└─────────────┴────────────┴─────────────────────────────┘
Length (3 bytes)
Big-endian unsigned integer
Specifies size of FLAGS + PAYLOAD
Does NOT include the 3-byte length prefix itself
Maximum: 2 MiB (enforced limit)
Flags (1 byte)
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ 7 │ 6 │ 5 │ 4 │ 3 │ 2 │ 1 │ 0 │ Bit positions
├───┼───┴───┼───┴───┴───┴───┴───┤
│ T │ C C │ Reserved (0s) │ Purpose
└───┴───────┴───────────────────┘
T = Terminal flag (1 bit)
C = Compression (2 bits, values 0-3)
Payload
Protocol buffer message (regular frames)
Status code + JSON body (terminal frames)
Regular Frame
Carries a Protocol Buffer message:
┌─────────────┬────────────┬─────────────────────────────┐
│ LENGTH │ FLAGS │ Compressed Proto Message │
│ (3 bytes) │ (1 byte) │ (variable length) │
├─────────────┼────────────┼─────────────────────────────┤
│ 0x00 00 XX │ 0 CA XXXXX │ AppendInput or ReadBatch │
└─────────────┴────────────┴─────────────────────────────┘
Terminal bit = 0
Compression bits indicate algorithm
Payload is a Protocol Buffer message
Terminal Frame
Signals end of stream with error details:
┌─────────────┬────────────┬─────────────┬───────────────┐
│ LENGTH │ FLAGS │ STATUS CODE │ JSON BODY │
│ (3 bytes) │ (1 byte) │ (2 bytes) │ (variable) │
├─────────────┼────────────┼─────────────┼───────────────┤
│ 0x00 00 XX │ 1 CA XXXXX │ HTTP Code │ {"error"..} │
└─────────────┴────────────┴─────────────┴───────────────┘
Terminal bit = 1
Status code is HTTP status (2 bytes, big-endian)
Body is JSON-encoded error message
After receiving or sending a terminal frame, the session must be closed. No further frames should be sent or expected.
Compression
S2S supports three compression algorithms:
Value Algorithm Description 0None No compression (small messages) 1Zstd Zstandard compression (preferred) 2Gzip Gzip compression (widely supported) 3Reserved Future use
Compression Selection
The server selects compression based on the Accept-Encoding header:
Accept-Encoding: zstd, gzip
Zstd is preferred for best compression and speed
Gzip is used as fallback if zstd is not supported
None is used if no encoding is specified
Compression Threshold
Messages smaller than 1 KiB are not compressed (compression flag = 0) even if an algorithm is selected. This avoids compression overhead for small payloads.
Message Types
Read Session Messages
Server sends ReadBatch messages:
message ReadBatch {
repeated SequencedRecord records = 1 ;
optional StreamPosition tail = 2 ;
}
message SequencedRecord {
uint64 seq_num = 1 ;
uint64 timestamp = 2 ;
repeated Header headers = 3 ;
bytes body = 4 ;
}
Example flow:
Client opens S2S read session
Server sends frames containing ReadBatch messages
Each batch contains one or more records
Server sends heartbeats (empty batches) every 5-15 seconds
Session continues until error or client closes
Append Session Messages
Client sends AppendInput, server responds with AppendAck:
message AppendInput {
repeated AppendRecord records = 1 ;
optional uint64 match_seq_num = 2 ;
optional string fencing_token = 3 ;
}
message AppendAck {
optional StreamPosition start = 1 ;
optional StreamPosition end = 2 ;
optional StreamPosition tail = 3 ;
}
Example flow:
Client opens S2S append session
Client sends frames containing AppendInput messages
Server responds with frames containing AppendAck messages
Process continues for streaming writes
Either side can send terminal frame to end session
Initiating a Session
Read Session
Open a streaming read:
GET /v1/basins/{basin}/streams/{stream}/read?wait= 300
Authorization: Bearer YOUR_TOKEN
Accept: s2s/proto
Accept-Encoding: zstd, gzip
Response:
HTTP/1.1 200 OK
Content-Type: s2s/proto
Content-Encoding: zstd
[S2S frames containing ReadBatch messages...]
Append Session
Open a streaming append:
POST /v1/basins/{basin}/streams/{stream}/append
Authorization: Bearer YOUR_TOKEN
Content-Type: s2s/proto
Accept: s2s/proto
Accept-Encoding: zstd
[S2S frames containing AppendInput messages...]
Response:
HTTP/1.1 200 OK
Content-Type: s2s/proto
Content-Encoding: zstd
[S2S frames containing AppendAck messages...]
Error Handling
Terminal Messages
When an error occurs, the server sends a terminal frame:
Example terminal frame payload:
Flags: 0x80 (terminal bit set)
Status: 0x01 0x90 (400 Bad Request)
Body: {"error":"validation_error","message":"Invalid record format"}
Common Error Codes
Status Meaning Handling 400Bad Request Fix request parameters 401Unauthorized Check access token 403Forbidden Insufficient permissions 404Not Found Stream does not exist 409Conflict Condition failed (seq_num mismatch, etc) 416Range Not Satisfiable Read position beyond tail 500Internal Error Retry with backoff
Retry Strategy
For transient errors (5xx), implement exponential backoff:
max_retries = 5
base_delay = 1.0 # seconds
for attempt in range (max_retries):
try :
session = open_s2s_session()
# process frames...
break
except ServerError:
if attempt < max_retries - 1 :
delay = base_delay * ( 2 ** attempt)
time.sleep(delay)
else :
raise
Frame Encoding Example
Encoding a ReadBatch with 2 records:
import struct
import zstandard as zstd
# Create protobuf message
read_batch = ReadBatch()
read_batch.records.append(SequencedRecord(
seq_num = 100 ,
timestamp = 1234567890 ,
body = b "Hello, S2!"
))
# Serialize to bytes
proto_bytes = read_batch.SerializeToString()
# Compress if > 1 KiB
if len (proto_bytes) >= 1024 :
compressor = zstd.ZstdCompressor()
payload = compressor.compress(proto_bytes)
compression = 1 # Zstd
else :
payload = proto_bytes
compression = 0 # None
# Build flags byte
flags = (compression << 5 ) & 0x 60 # Compression bits
# Calculate length
length = 1 + len (payload) # flags + payload
# Encode frame
frame = struct.pack( '>I' , length)[ 1 :] # 3-byte length
frame += struct.pack( 'B' , flags) # 1-byte flags
frame += payload # Variable payload
# Send frame over socket
socket.send(frame)
Frame Decoding Example
import struct
import zstandard as zstd
# Read 3-byte length prefix
length_bytes = socket.recv( 3 )
length = struct.unpack( '>I' , b ' \x00 ' + length_bytes)[ 0 ]
# Read flags + payload
frame_data = socket.recv(length)
flags = frame_data[ 0 ]
payload = frame_data[ 1 :]
# Check if terminal
is_terminal = (flags & 0x 80 ) != 0
if is_terminal:
# Parse terminal frame
status = struct.unpack( '>H' , payload[: 2 ])[ 0 ]
body = payload[ 2 :].decode( 'utf-8' )
error = json.loads(body)
raise SessionError(status, error)
# Extract compression
compression = (flags & 0x 60 ) >> 5
# Decompress if needed
if compression == 1 : # Zstd
decompressor = zstd.ZstdDecompressor()
proto_bytes = decompressor.decompress(payload)
elif compression == 2 : # Gzip
proto_bytes = gzip.decompress(payload)
else : # None
proto_bytes = payload
# Parse protobuf
read_batch = ReadBatch()
read_batch.ParseFromString(proto_bytes)
Protocol Buffer Definitions
S2S uses Protocol Buffers v3. Full definitions:
StreamPosition
message StreamPosition {
uint64 seq_num = 1 ;
uint64 timestamp = 2 ;
}
message Header {
bytes name = 1 ;
bytes value = 2 ;
}
AppendRecord
message AppendRecord {
optional uint64 timestamp = 1 ;
repeated Header headers = 2 ;
bytes body = 3 ;
}
SequencedRecord
message SequencedRecord {
uint64 seq_num = 1 ;
uint64 timestamp = 2 ;
repeated Header headers = 3 ;
bytes body = 4 ;
}
See the full protocol buffer definitions in the S2 repository.
SDK Support
S2 SDKs provide automatic S2S protocol handling:
Rust SDK
use s2 :: { Client , types :: StreamName };
let client = Client :: from_env () ? ;
let basin = client . basin ( "my-basin" );
// Streaming read with S2S
let mut stream = basin
. stream ( StreamName :: new ( "my-stream" ) ? )
. read ()
. start_seq_num ( 0 )
. into_stream ()
. await ? ;
while let Some ( batch ) = stream . next () . await {
for record in batch ?. records {
println! ( "seq={} data={:?}" , record . seq_num, record . body);
}
}
// Streaming append with S2S
let mut appender = basin
. stream ( StreamName :: new ( "my-stream" ) ? )
. append_session ()
. await ? ;
for i in 0 .. 1000 {
let record = AppendRecordBuilder :: new ()
. body ( format! ( "record {}" , i ))
. build () ? ;
let ack = appender . append ( vec! [ record ]) . await ? ;
println! ( "Appended at seq_num={}" , ack . start . seq_num);
}
The SDK handles frame encoding/decoding, compression negotiation, and error handling automatically.
Compression Ratios
Typical compression ratios for log data:
Zstd : 5-10x compression
Gzip : 4-8x compression
None : 1x (no compression)
Latency
Frame overhead : ~4 bytes per message (length + flags)
Network RTT : Depends on client location
Compression : 1-5ms for typical messages
Throughput
S2S can sustain:
Reads : 100+ MB/s per session with compression
Writes : 50+ MB/s per session with compression
Actual performance depends on record size, compression ratio, and network conditions.
Best Practices
1. Enable Compression
Always request compression for production workloads:
Accept-Encoding: zstd, gzip
2. Batch Records
Batch multiple records per frame to reduce overhead:
# Good: Batch 100 records
append_input = AppendInput( records = records_batch)
# Less efficient: 100 separate frames
for record in records:
append_input = AppendInput( records = [record])
3. Handle Terminal Frames
Always check for terminal frames and handle gracefully:
if is_terminal:
logger.error( f "Session ended: { status } { error_body } " )
# Clean up resources
socket.close()
return
4. Implement Heartbeat Detection
For read sessions, detect heartbeats (empty batches) to confirm liveness:
if len (read_batch.records) == 0 :
logger.debug( "Received heartbeat" )
last_heartbeat = time.time()
5. Use Connection Pooling
Reuse connections when possible, but open new sessions for each streaming operation.
Limitations
Maximum frame size : 2 MiB (enforced)
Maximum batch size : 1000 records or 1 MiB of metered data
Heartbeat interval : 5-15 seconds (server-controlled)
Next Steps
API Overview Learn about S2 API structure and endpoints
SDK Reference Use the Rust SDK for automatic S2S handling