Skip to main content

GET /streams//records (Streaming)

Establish a long-lived streaming session to continuously read records from a stream. This endpoint supports two streaming protocols: Server-Sent Events (SSE) and S2S.

When to Use Streaming

Use streaming reads when you need to:
  • Tail a stream in real-time
  • Process records continuously as they arrive
  • Read large volumes of data without pagination
  • Maintain a live connection with automatic reconnection

Authentication

Requires a valid access token with read permissions to the stream.

Path Parameters

stream
string
required
Stream name to read records from.

Headers

S2-Basin
string
required
Basin name where the stream resides.
S2-Format
string
default:"base64"
Encoding format for SSE streaming:
  • base64 - Base64-encoded binary data (default)
  • utf8 - UTF-8 text
Not applicable for S2S protocol (always uses binary protobuf).
Accept
string
required
Streaming protocol:
  • text/event-stream - Server-Sent Events
  • s2s/proto - S2S binary protocol
Last-Event-ID
string
For SSE reconnection. Automatically set by browsers when reconnecting.Format: {seq_num}:{count}:{bytes}Example: "42:10:8192" resumes from seq_num 43 with 10 records and 8192 bytes already consumed.
Accept-Encoding
string
For S2S protocol, request compression:
  • zstd - Zstandard compression (preferred)
  • gzip - Gzip compression
The server selects the best supported algorithm.

Query Parameters

Same as Read Records with these differences:
  • count and bytes have no default limits (can stream indefinitely)
  • wait defaults to infinite (connection stays open)
  • wait acts as an idle timeout - connection closes if no records arrive within this duration

Server-Sent Events (SSE)

Event Types

batch
event
Contains a batch of records.
  • id: {seq_num}:{count}:{bytes} - Resume token for reconnection
  • data: JSON object with records array and optional tail
ping
event
Heartbeat event sent when switching to real-time tail following or during idle periods (every 5-15 seconds).
  • id: Last known position
  • data: Empty
done
event
End of stream event. Sent when all requested records have been delivered.
  • data: Empty
error
event
Error event terminates the stream.
  • data: JSON error details

Examples

Stream from tail with SSE

curl -N -X GET "https://mybasin.b.aws.s2.dev/v1/streams/events/records?tail_offset=0" \
  -H "Authorization: Bearer $TOKEN" \
  -H "S2-Basin: mybasin" \
  -H "Accept: text/event-stream" \
  -H "S2-Format: utf8"
Response stream:
id: 100:1:256
event: batch
data: {"records":[{"seq_num":100,"timestamp":1709481234567,"headers":[],"body":"Event data"}],"tail":{"seq_num":101,"timestamp":1709481234567}}

id: 101:2:512
event: batch  
data: {"records":[{"seq_num":101,"timestamp":1709481234890,"headers":[],"body":"Another event"}],"tail":{"seq_num":102,"timestamp":1709481234890}}

id: 101:2:512
event: ping
data:

Stream historical records then tail

curl -N -X GET "https://mybasin.b.aws.s2.dev/v1/streams/logs/records?seq_num=0" \
  -H "Authorization: Bearer $TOKEN" \
  -H "S2-Basin: mybasin" \
  -H "Accept: text/event-stream"
This reads all historical records, then continues streaming new records as they arrive.

Reconnection with Last-Event-ID

const eventSource = new EventSource(
  'https://mybasin.b.aws.s2.dev/v1/streams/events/records?tail_offset=0',
  {
    headers: {
      'Authorization': 'Bearer ' + token,
      'S2-Basin': 'mybasin'
    }
  }
);

eventSource.addEventListener('batch', (e) => {
  const batch = JSON.parse(e.data);
  console.log('Received records:', batch.records);
  // Browser automatically sends Last-Event-ID on reconnection
});

eventSource.addEventListener('done', () => {
  console.log('Stream complete');
  eventSource.close();
});

eventSource.addEventListener('error', (e) => {
  console.error('Stream error:', e);
});

Stream with time bound

curl -N -X GET "https://mybasin.b.aws.s2.dev/v1/streams/events/records?timestamp=1709481234000&until=1709481235000" \
  -H "Authorization: Bearer $TOKEN" \
  -H "S2-Basin: mybasin" \
  -H "Accept: text/event-stream"
Streams all records in the time range, then sends a done event.

S2S Protocol

The S2S (S2 Streaming) protocol is a high-performance binary streaming format using length-prefixed protobuf frames.

Frame Format

┌─────────────┬────────────┬─────────────────────────────┐
│   LENGTH    │   FLAGS    │        PAYLOAD DATA         │
│  (3 bytes)  │  (1 byte)  │     (variable length)       │
├─────────────┼────────────┼─────────────────────────────┤
│ 0x00 00 XX  │ 0 CA XXXXX │  Compressed proto message   │
└─────────────┴────────────┴─────────────────────────────┘
LENGTH: Size of FLAGS + PAYLOAD (excludes the 3-byte length header itself). Maximum 2 MiB. FLAGS:
  • Bit 7: Terminal flag (1 = terminal message, 0 = regular)
  • Bits 6-5: Compression algorithm (00 = none, 01 = zstd, 10 = gzip)
  • Bits 4-0: Reserved (must be 0)
PAYLOAD:
  • Regular frame: Protobuf ReadBatch message (optionally compressed)
  • Terminal frame: 2-byte status code + JSON error body

Regular Frame

Contains a ReadBatch protobuf message:
message ReadBatch {
  repeated SequencedRecord records = 1;
  optional StreamPosition tail = 2;
}

message SequencedRecord {
  uint64 seq_num = 1;
  uint64 timestamp = 2;
  repeated Header headers = 3;
  bytes body = 4;
}

message Header {
  bytes name = 1;
  bytes value = 2;
}

message StreamPosition {
  uint64 seq_num = 1;
  uint64 timestamp = 2;
}

Heartbeat Frames

Empty records array with tail set indicates a heartbeat:
ReadBatch {
  records: [],
  tail: { seq_num: 100, timestamp: 1709481234567 }
}

Terminal Frame

Terminates the stream with an error or normal completion:
┌─────────────┬────────────┬─────────────┬───────────────┐
│   LENGTH    │   FLAGS    │ STATUS CODE │   JSON BODY   │
│  (3 bytes)  │  (1 byte)  │  (2 bytes)  │  (variable)   │
├─────────────┼────────────┼─────────────┼───────────────┤
│ 0x00 00 XX  │ 1 CA XXXXX │   HTTP Code │   JSON data   │
└─────────────┴────────────┴─────────────┴───────────────┘

Compression

Payloads over 1 KiB are automatically compressed if Accept-Encoding is specified:
  • zstd - Zstandard (preferred, better compression ratio)
  • gzip - Gzip (widely supported)
Clients must decompress based on the compression bits in FLAGS.

Examples

S2S stream from tail

curl -N -X GET "https://mybasin.b.aws.s2.dev/v1/streams/events/records?tail_offset=0" \
  -H "Authorization: Bearer $TOKEN" \
  -H "S2-Basin: mybasin" \
  -H "Accept: s2s/proto" \
  -H "Accept-Encoding: zstd" \
  --output - | xxd

Python S2S client example

import requests
import struct
import s2_pb2  # Generated from protobuf schema

url = "https://mybasin.b.aws.s2.dev/v1/streams/events/records"
headers = {
    "Authorization": f"Bearer {token}",
    "S2-Basin": "mybasin",
    "Accept": "s2s/proto",
    "Accept-Encoding": "zstd"
}
params = {"tail_offset": 0}

with requests.get(url, headers=headers, params=params, stream=True) as r:
    r.raise_for_status()
    buffer = b""
    
    for chunk in r.iter_content(chunk_size=8192):
        buffer += chunk
        
        while len(buffer) >= 3:
            # Read length prefix (3 bytes, big-endian)
            length = struct.unpack(">I", b"\x00" + buffer[:3])[0]
            
            if len(buffer) < 3 + length:
                break  # Wait for more data
            
            # Extract frame
            flags = buffer[3]
            payload = buffer[4:3+length]
            buffer = buffer[3+length:]
            
            # Check terminal flag
            if flags & 0x80:
                status = struct.unpack(">H", payload[:2])[0]
                body = payload[2:].decode('utf-8')
                print(f"Terminal: {status} - {body}")
                break
            
            # Decompress if needed
            compression = (flags & 0x60) >> 5
            if compression == 1:  # zstd
                import zstandard
                dctx = zstandard.ZstdDecompressor()
                payload = dctx.decompress(payload)
            elif compression == 2:  # gzip
                import gzip
                payload = gzip.decompress(payload)
            
            # Parse protobuf
            batch = s2_pb2.ReadBatch()
            batch.ParseFromString(payload)
            
            if batch.records:
                for record in batch.records:
                    print(f"Record {record.seq_num}: {record.body}")
            else:
                print(f"Heartbeat: tail at {batch.tail.seq_num}")

Session Behavior

Historical Catch-up

When starting before the tail, the session reads historical records in batches:
  1. Batches contain up to 1000 records or 1 MiB (whichever is reached first)
  2. Batches are delivered as fast as the client can consume them
  3. No heartbeats during historical catch-up

Real-time Tail Following

Once caught up to the tail:
  1. A heartbeat is sent to signal the transition
  2. New records are streamed as they arrive
  3. Heartbeats sent every 5-15 seconds if idle
  4. Connection remains open until client closes or wait timeout expires

Bounds and Termination

The session terminates with a done event when:
  • count limit is reached
  • bytes limit is reached
  • until timestamp is encountered
  • wait timeout expires with no new records
For unbounded streaming (no count, bytes, or until), the session continues indefinitely.

Error Handling

SSE Reconnection

Browsers automatically reconnect on connection loss. Set Last-Event-ID header with the last received event ID to resume:
Last-Event-ID: 42:10:8192
The server resumes from seq_num 43 and adjusts count/bytes limits.

S2S Reconnection

Clients must implement reconnection logic:
  1. Track the last successfully processed seq_num
  2. On reconnection, set seq_num={last_seq_num + 1} in query params
  3. Adjust count and bytes limits to account for already-received records

Common Errors

  • 408 Request Timeout: wait timeout expired, reconnect to continue
  • 416 Range Not Satisfiable: Starting position beyond tail (check tail in error response)
  • 503 Service Unavailable: Temporary backend issue, retry with backoff

Performance Considerations

  • SSE: Lower throughput due to JSON overhead, but simpler integration and automatic reconnection
  • S2S: Maximum throughput with binary protobuf and compression, requires custom client
  • Historical catch-up is bandwidth-limited by storage backend (~100-500 MB/s typical)
  • Real-time streaming latency is typically < 10ms from append to delivery
  • Use compression for payloads > 1 KiB to reduce bandwidth

Notes

  • SSE connections have a default max age of 45 seconds before requiring reconnection (configurable)
  • S2S streams continue until explicitly terminated or error occurs
  • Heartbeats prevent connection timeout and confirm session health
  • Empty batches (heartbeats) update the tail position but contain no records
  • For fan-out scenarios, consider using multiple streams instead of multiple readers on one stream

Build docs developers (and LLMs) love