Skip to main content
The Go SDK provides an idiomatic Go interface for interacting with S2, with support for goroutines, context, and channels.
The Go SDK is compatible with S2 v1 API (v0.11+) and ready for production use.

Installation

Install via go get:
go get github.com/s2-streamstore/s2-sdk-go

Quick Start

Initialize the Client

import (
    "context"
    "github.com/s2-streamstore/s2-sdk-go"
)

func main() {
    ctx := context.Background()
    
    client, err := s2.New(s2.Config{
        AccessToken: "your_access_token",
    })
    if err != nil {
        panic(err)
    }
}

Basic Operations

// List basins
basins, err := client.ListBasins(ctx, s2.ListBasinsInput{})
if err != nil {
    panic(err)
}

// Create a basin
basin, err := client.CreateBasin(ctx, s2.CreateBasinInput{
    Name: "my-basin",
})
if err != nil {
    panic(err)
}

// Get a stream
stream := client.Basin("my-basin").Stream("my-stream")

// Append records
producer := stream.Producer()
ack, err := producer.Append(ctx, s2.Record{
    Body: []byte("Hello, S2!"),
})
if err != nil {
    panic(err)
}

// Read records
reader := stream.Reader()
for batch := range reader.Read(ctx) {
    if batch.Err != nil {
        panic(batch.Err)
    }
    for _, record := range batch.Records {
        fmt.Printf("Record: %s\n", record.Body)
    }
}

Working with Basins

Create a Basin

basin, err := client.CreateBasin(ctx, s2.CreateBasinInput{
    Name: "my-basin",
    Config: &s2.BasinConfig{
        DefaultStreamConfig: &s2.StreamConfig{
            RetentionPolicy: &s2.RetentionPolicy{
                Type:    s2.RetentionTypeAge,
                Seconds: 7 * 24 * 60 * 60, // 7 days
            },
        },
    },
})
if err != nil {
    return err
}

List Basins

// Single page
page, err := client.ListBasins(ctx, s2.ListBasinsInput{
    Limit: 10,
})
if err != nil {
    return err
}

// Iterate through all basins
for basin := range client.ListAllBasins(ctx) {
    if basin.Err != nil {
        return basin.Err
    }
    fmt.Println(basin.Name)
}

Get Basin Configuration

config, err := client.GetBasinConfig(ctx, "my-basin")
if err != nil {
    return err
}

Delete a Basin

err := client.DeleteBasin(ctx, s2.DeleteBasinInput{
    Name: "my-basin",
})

Working with Streams

Create a Stream

basin := client.Basin("my-basin")

stream, err := basin.CreateStream(ctx, s2.CreateStreamInput{
    Name: "my-stream",
    Config: &s2.StreamConfig{
        Timestamping: &s2.TimestampingConfig{
            Mode: s2.TimestampModeClientRequire,
        },
    },
})
if err != nil {
    return err
}

List Streams

streams, err := basin.ListStreams(ctx, s2.ListStreamsInput{})
if err != nil {
    return err
}

// Iterate through all streams
for stream := range basin.ListAllStreams(ctx) {
    if stream.Err != nil {
        return stream.Err
    }
    fmt.Println(stream.Name)
}

Delete a Stream

err := basin.DeleteStream(ctx, s2.DeleteStreamInput{
    Name: "my-stream",
})

Writing Records

Using the Producer

stream := client.Basin("my-basin").Stream("my-stream")
producer := stream.Producer()

// Append a single record
ack, err := producer.Append(ctx, s2.Record{
    Body: []byte("my data"),
})
if err != nil {
    return err
}
fmt.Printf("Sequence number: %d\n", ack.SeqNum)

// Append with headers
ack, err = producer.Append(ctx, s2.Record{
    Body: []byte("data"),
    Headers: map[string]string{
        "content-type": "application/json",
    },
})

Batch Append

records := []s2.Record{
    {Body: []byte("record 1")},
    {Body: []byte("record 2")},
    {Body: []byte("record 3")},
}

ack, err := stream.AppendBatch(ctx, s2.AppendInput{
    Records: records,
})
if err != nil {
    return err
}
fmt.Printf("First seq num: %d\n", ack.FirstSeqNum)

Reading Records

Using the Reader

stream := client.Basin("my-basin").Stream("my-stream")
reader := stream.Reader()

// Read from the beginning
for batch := range reader.Read(ctx) {
    if batch.Err != nil {
        return batch.Err
    }
    for _, record := range batch.Records {
        fmt.Printf("SeqNum: %d, Body: %s\n", record.SeqNum, record.Body)
    }
}

Read with Options

// Read from a specific position
for batch := range reader.ReadFrom(ctx, s2.ReadOptions{
    StartSeqNum: 100,
    Limit:       1000,
}) {
    if batch.Err != nil {
        return batch.Err
    }
    // Process batch
}

Single Read

batch, err := stream.Read(ctx, s2.ReadInput{
    StartSeqNum: 0,
    Limit:       100,
})
if err != nil {
    return err
}

for _, record := range batch.Records {
    fmt.Println(string(record.Body))
}

Check Tail

tail, err := stream.CheckTail(ctx)
if err != nil {
    return err
}
fmt.Printf("Latest sequence number: %d\n", tail.SeqNum)

Context and Cancellation

All SDK methods accept a context.Context for timeout and cancellation:
// With timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

basin, err := client.CreateBasin(ctx, s2.CreateBasinInput{
    Name: "my-basin",
})

// With cancellation
ctx, cancel := context.WithCancel(context.Background())

go func() {
    // Cancel after some condition
    cancel()
}()

reader := stream.Reader()
for batch := range reader.Read(ctx) {
    // Will stop when context is cancelled
}

Access Tokens

Issue a Token

token, err := client.IssueAccessToken(ctx, s2.IssueAccessTokenInput{
    Description: "My application token",
})
if err != nil {
    return err
}

List Tokens

tokens, err := client.ListAccessTokens(ctx, s2.ListAccessTokensInput{})
if err != nil {
    return err
}

for _, token := range tokens.Values {
    fmt.Printf("%s: %s\n", token.ID, token.Description)
}

Revoke a Token

err := client.RevokeAccessToken(ctx, "token-id")

Metrics

// Account metrics
metrics, err := client.GetAccountMetrics(ctx, s2.GetMetricsInput{
    Name: s2.MetricRecordBytes,
})
if err != nil {
    return err
}

// Basin metrics
metrics, err = client.GetBasinMetrics(ctx, s2.GetBasinMetricsInput{
    Basin: "my-basin",
    Name:  s2.MetricAppendCount,
})

// Stream metrics
metrics, err = client.GetStreamMetrics(ctx, s2.GetStreamMetricsInput{
    Basin:  "my-basin",
    Stream: "my-stream",
    Name:   s2.MetricReadCount,
})

Error Handling

import "github.com/s2-streamstore/s2-sdk-go/errors"

basin, err := client.CreateBasin(ctx, s2.CreateBasinInput{
    Name: "my-basin",
})
if err != nil {
    switch {
    case errors.IsUnauthorized(err):
        fmt.Println("Invalid access token")
    case errors.IsNotFound(err):
        fmt.Println("Resource not found")
    case errors.IsConflict(err):
        fmt.Println("Resource already exists")
    default:
        fmt.Printf("Error: %v\n", err)
    }
}

Configuration

Custom Endpoint

client, err := s2.New(s2.Config{
    AccessToken: "your_token",
    BaseURL:     "https://custom.s2.dev",
})

HTTP Client Customization

import "net/http"

customClient := &http.Client{
    Timeout: 30 * time.Second,
}

client, err := s2.New(s2.Config{
    AccessToken: "your_token",
    HTTPClient:  customClient,
})

Concurrency

The SDK is safe for concurrent use:
var wg sync.WaitGroup
stream := client.Basin("my-basin").Stream("my-stream")
producer := stream.Producer()

// Multiple goroutines can use the same producer
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(n int) {
        defer wg.Done()
        ack, err := producer.Append(ctx, s2.Record{
            Body: []byte(fmt.Sprintf("message %d", n)),
        })
        if err != nil {
            fmt.Printf("Error: %v\n", err)
            return
        }
        fmt.Printf("Appended with seq num: %d\n", ack.SeqNum)
    }(i)
}

wg.Wait()

Resources

GitHub

Source code and documentation

Go.dev

API reference documentation

REST API

Underlying REST API reference

Discord

Join our community

Next Steps

Build docs developers (and LLMs) love