The Go SDK provides an idiomatic Go interface for interacting with S2, with support for goroutines, context, and channels.
The Go SDK is compatible with S2 v1 API (v0.11+) and ready for production use.
Installation
Install via go get:
go get github.com/s2-streamstore/s2-sdk-go
Quick Start
Initialize the Client
import (
" context "
" github.com/s2-streamstore/s2-sdk-go "
)
func main () {
ctx := context . Background ()
client , err := s2 . New ( s2 . Config {
AccessToken : "your_access_token" ,
})
if err != nil {
panic ( err )
}
}
Basic Operations
// List basins
basins , err := client . ListBasins ( ctx , s2 . ListBasinsInput {})
if err != nil {
panic ( err )
}
// Create a basin
basin , err := client . CreateBasin ( ctx , s2 . CreateBasinInput {
Name : "my-basin" ,
})
if err != nil {
panic ( err )
}
// Get a stream
stream := client . Basin ( "my-basin" ). Stream ( "my-stream" )
// Append records
producer := stream . Producer ()
ack , err := producer . Append ( ctx , s2 . Record {
Body : [] byte ( "Hello, S2!" ),
})
if err != nil {
panic ( err )
}
// Read records
reader := stream . Reader ()
for batch := range reader . Read ( ctx ) {
if batch . Err != nil {
panic ( batch . Err )
}
for _ , record := range batch . Records {
fmt . Printf ( "Record: %s \n " , record . Body )
}
}
Working with Basins
Create a Basin
basin , err := client . CreateBasin ( ctx , s2 . CreateBasinInput {
Name : "my-basin" ,
Config : & s2 . BasinConfig {
DefaultStreamConfig : & s2 . StreamConfig {
RetentionPolicy : & s2 . RetentionPolicy {
Type : s2 . RetentionTypeAge ,
Seconds : 7 * 24 * 60 * 60 , // 7 days
},
},
},
})
if err != nil {
return err
}
List Basins
// Single page
page , err := client . ListBasins ( ctx , s2 . ListBasinsInput {
Limit : 10 ,
})
if err != nil {
return err
}
// Iterate through all basins
for basin := range client . ListAllBasins ( ctx ) {
if basin . Err != nil {
return basin . Err
}
fmt . Println ( basin . Name )
}
Get Basin Configuration
config , err := client . GetBasinConfig ( ctx , "my-basin" )
if err != nil {
return err
}
Delete a Basin
err := client . DeleteBasin ( ctx , s2 . DeleteBasinInput {
Name : "my-basin" ,
})
Working with Streams
Create a Stream
basin := client . Basin ( "my-basin" )
stream , err := basin . CreateStream ( ctx , s2 . CreateStreamInput {
Name : "my-stream" ,
Config : & s2 . StreamConfig {
Timestamping : & s2 . TimestampingConfig {
Mode : s2 . TimestampModeClientRequire ,
},
},
})
if err != nil {
return err
}
List Streams
streams , err := basin . ListStreams ( ctx , s2 . ListStreamsInput {})
if err != nil {
return err
}
// Iterate through all streams
for stream := range basin . ListAllStreams ( ctx ) {
if stream . Err != nil {
return stream . Err
}
fmt . Println ( stream . Name )
}
Delete a Stream
err := basin . DeleteStream ( ctx , s2 . DeleteStreamInput {
Name : "my-stream" ,
})
Writing Records
Using the Producer
stream := client . Basin ( "my-basin" ). Stream ( "my-stream" )
producer := stream . Producer ()
// Append a single record
ack , err := producer . Append ( ctx , s2 . Record {
Body : [] byte ( "my data" ),
})
if err != nil {
return err
}
fmt . Printf ( "Sequence number: %d \n " , ack . SeqNum )
// Append with headers
ack , err = producer . Append ( ctx , s2 . Record {
Body : [] byte ( "data" ),
Headers : map [ string ] string {
"content-type" : "application/json" ,
},
})
Batch Append
records := [] s2 . Record {
{ Body : [] byte ( "record 1" )},
{ Body : [] byte ( "record 2" )},
{ Body : [] byte ( "record 3" )},
}
ack , err := stream . AppendBatch ( ctx , s2 . AppendInput {
Records : records ,
})
if err != nil {
return err
}
fmt . Printf ( "First seq num: %d \n " , ack . FirstSeqNum )
Reading Records
Using the Reader
stream := client . Basin ( "my-basin" ). Stream ( "my-stream" )
reader := stream . Reader ()
// Read from the beginning
for batch := range reader . Read ( ctx ) {
if batch . Err != nil {
return batch . Err
}
for _ , record := range batch . Records {
fmt . Printf ( "SeqNum: %d , Body: %s \n " , record . SeqNum , record . Body )
}
}
Read with Options
// Read from a specific position
for batch := range reader . ReadFrom ( ctx , s2 . ReadOptions {
StartSeqNum : 100 ,
Limit : 1000 ,
}) {
if batch . Err != nil {
return batch . Err
}
// Process batch
}
Single Read
batch , err := stream . Read ( ctx , s2 . ReadInput {
StartSeqNum : 0 ,
Limit : 100 ,
})
if err != nil {
return err
}
for _ , record := range batch . Records {
fmt . Println ( string ( record . Body ))
}
Check Tail
tail , err := stream . CheckTail ( ctx )
if err != nil {
return err
}
fmt . Printf ( "Latest sequence number: %d \n " , tail . SeqNum )
Context and Cancellation
All SDK methods accept a context.Context for timeout and cancellation:
// With timeout
ctx , cancel := context . WithTimeout ( context . Background (), 10 * time . Second )
defer cancel ()
basin , err := client . CreateBasin ( ctx , s2 . CreateBasinInput {
Name : "my-basin" ,
})
// With cancellation
ctx , cancel := context . WithCancel ( context . Background ())
go func () {
// Cancel after some condition
cancel ()
}()
reader := stream . Reader ()
for batch := range reader . Read ( ctx ) {
// Will stop when context is cancelled
}
Access Tokens
Issue a Token
token , err := client . IssueAccessToken ( ctx , s2 . IssueAccessTokenInput {
Description : "My application token" ,
})
if err != nil {
return err
}
List Tokens
tokens , err := client . ListAccessTokens ( ctx , s2 . ListAccessTokensInput {})
if err != nil {
return err
}
for _ , token := range tokens . Values {
fmt . Printf ( " %s : %s \n " , token . ID , token . Description )
}
Revoke a Token
err := client . RevokeAccessToken ( ctx , "token-id" )
Metrics
// Account metrics
metrics , err := client . GetAccountMetrics ( ctx , s2 . GetMetricsInput {
Name : s2 . MetricRecordBytes ,
})
if err != nil {
return err
}
// Basin metrics
metrics , err = client . GetBasinMetrics ( ctx , s2 . GetBasinMetricsInput {
Basin : "my-basin" ,
Name : s2 . MetricAppendCount ,
})
// Stream metrics
metrics , err = client . GetStreamMetrics ( ctx , s2 . GetStreamMetricsInput {
Basin : "my-basin" ,
Stream : "my-stream" ,
Name : s2 . MetricReadCount ,
})
Error Handling
import " github.com/s2-streamstore/s2-sdk-go/errors "
basin , err := client . CreateBasin ( ctx , s2 . CreateBasinInput {
Name : "my-basin" ,
})
if err != nil {
switch {
case errors . IsUnauthorized ( err ):
fmt . Println ( "Invalid access token" )
case errors . IsNotFound ( err ):
fmt . Println ( "Resource not found" )
case errors . IsConflict ( err ):
fmt . Println ( "Resource already exists" )
default :
fmt . Printf ( "Error: %v \n " , err )
}
}
Configuration
Custom Endpoint
client , err := s2 . New ( s2 . Config {
AccessToken : "your_token" ,
BaseURL : "https://custom.s2.dev" ,
})
HTTP Client Customization
import " net/http "
customClient := & http . Client {
Timeout : 30 * time . Second ,
}
client , err := s2 . New ( s2 . Config {
AccessToken : "your_token" ,
HTTPClient : customClient ,
})
Concurrency
The SDK is safe for concurrent use:
var wg sync . WaitGroup
stream := client . Basin ( "my-basin" ). Stream ( "my-stream" )
producer := stream . Producer ()
// Multiple goroutines can use the same producer
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func ( n int ) {
defer wg . Done ()
ack , err := producer . Append ( ctx , s2 . Record {
Body : [] byte ( fmt . Sprintf ( "message %d " , n )),
})
if err != nil {
fmt . Printf ( "Error: %v \n " , err )
return
}
fmt . Printf ( "Appended with seq num: %d \n " , ack . SeqNum )
}( i )
}
wg . Wait ()
Resources
GitHub Source code and documentation
Go.dev API reference documentation
REST API Underlying REST API reference
Discord Join our community
Next Steps