Skip to main content
The Rust SDK provides an ergonomic, type-safe interface for interacting with S2. It’s built on top of tokio for async I/O and offers first-class support for streams, batching, and backpressure.

Installation

Add the SDK to your Cargo.toml:
cargo add s2-sdk
cargo add tokio --features full
cargo add futures
Or manually add to Cargo.toml:
[dependencies]
s2-sdk = "0.24"
tokio = { version = "1", features = ["full"] }
futures = "0.3"

Quick Start

Initialize the Client

Create an S2 client with your access token:
use s2_sdk::{S2, types::S2Config};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let s2 = S2::new(S2Config::new("your_access_token"))?;
    Ok(())
}
Generate an access token from the S2 dashboard.

List Basins

Retrieve all basins in your account:
use s2_sdk::types::ListBasinsInput;

let page = s2.list_basins(ListBasinsInput::new()).await?;
println!("My basins: {:?}", page.values);
For automatic pagination:
use futures::{StreamExt, TryStreamExt};
use s2_sdk::types::ListAllBasinsInput;

let basins: Vec<_> = s2
    .list_all_basins(ListAllBasinsInput::new())
    .take(10)
    .try_collect()
    .await?;

Working with Basins

Create a Basin

use s2_sdk::types::{
    CreateBasinInput, BasinName, BasinConfig,
    StreamConfig, RetentionPolicy
};

let basin_name: BasinName = "my-basin".parse()?;
let input = CreateBasinInput::new(basin_name.clone())
    .with_config(
        BasinConfig::new().with_default_stream_config(
            StreamConfig::new()
                .with_retention_policy(RetentionPolicy::Age(10 * 24 * 60 * 60))
        )
    );

let basin_info = s2.create_basin(input).await?;
println!("Basin created: {:#?}", basin_info);

Get Basin Configuration

let config = s2.get_basin_config(basin_name).await?;
println!("Basin config: {:#?}", config);

Reconfigure a Basin

use s2_sdk::types::ReconfigureBasinInput;

let new_config = BasinConfig::new()
    .with_default_stream_config(
        StreamConfig::new()
            .with_retention_policy(RetentionPolicy::Age(30 * 24 * 60 * 60))
    );

let updated = s2.reconfigure_basin(
    ReconfigureBasinInput::new(basin_name, new_config)
).await?;

Delete a Basin

use s2_sdk::types::DeleteBasinInput;

s2.delete_basin(DeleteBasinInput::new(basin_name)).await?;

Working with Streams

Get a Stream Handle

use s2_sdk::types::{BasinName, StreamName};

let basin_name: BasinName = "my-basin".parse()?;
let stream_name: StreamName = "my-stream".parse()?;

let basin = s2.basin(basin_name);
let stream = basin.stream(stream_name);

Create a Stream

use s2_sdk::types::{
    CreateStreamInput, StreamConfig,
    TimestampingConfig, TimestampingMode
};

let input = CreateStreamInput::new(stream_name.clone())
    .with_config(
        StreamConfig::new().with_timestamping(
            TimestampingConfig::new()
                .with_mode(TimestampingMode::ClientRequire)
        )
    );

let stream_info = basin.create_stream(input).await?;

List Streams

use s2_sdk::types::ListStreamsInput;

let page = basin.list_streams(ListStreamsInput::new()).await?;

// Or with automatic pagination
use s2_sdk::types::ListAllStreamsInput;
let streams: Vec<_> = basin
    .list_all_streams(ListAllStreamsInput::new())
    .try_collect()
    .await?;

Writing Records

Using the Producer

The producer provides high-level batching and backpressure:
src/ops.rs:427-429
use s2_sdk::producer::ProducerConfig;
use s2_sdk::types::AppendRecord;

let producer = stream.producer(ProducerConfig::new());

// Submit records and get tickets
let ticket1 = producer.submit(AppendRecord::new("hello")?).await?;
let ticket2 = producer.submit(AppendRecord::new("world")?).await?;

// Await acknowledgments
let ack1 = ticket1.await?;
let ack2 = ticket2.await?;

println!("Record 1 seq_num: {}", ack1.seq_num);
println!("Record 2 seq_num: {}", ack2.seq_num);

// Close the producer to flush remaining records
producer.close().await?;

Direct Append

For more control, use the append API directly:
use s2_sdk::types::{AppendInput, AppendRecordBatch};

let batch = AppendRecordBatch::try_from_iter([
    AppendRecord::new("record1")?,
    AppendRecord::new("record2")?,
])?;

let ack = stream.append(AppendInput::new(batch)).await?;
println!("First seq_num: {}", ack.first_seq_num);

Append Session

For batch-level control with backpressure:
use s2_sdk::append_session::AppendSessionConfig;

let session = stream.append_session(AppendSessionConfig::new());
let ticket = session.submit(AppendInput::new(batch)).await?;
let ack = ticket.await?;

Reading Records

Read Session (Streaming)

For continuous reading with automatic pagination:
src/ops.rs:432-445
use futures::StreamExt;
use s2_sdk::types::ReadInput;

let mut batches = stream.read_session(ReadInput::new()).await?;

while let Some(batch) = batches.next().await {
    let batch = batch?;
    for record in batch.records {
        println!("seq_num: {}, body: {:?}", record.seq_num, record.body);
    }
}

Single Read

For one-time reads:
use s2_sdk::types::{ReadInput, SeqNum};

let batch = stream.read(ReadInput::new()).await?;
println!("Read {} records", batch.records.len());

// Read from specific position
let batch = stream.read(
    ReadInput::new().with_start_seq_num(SeqNum::from(100))
).await?;

Check Tail Position

src/ops.rs:391-395
let tail = stream.check_tail().await?;
println!("Tail seq_num: {}", tail.seq_num);

Stream Management

Trim a Stream

use s2_sdk::types::{AppendInput, AppendRecordBatch, CommandRecord};

let tail = stream.check_tail().await?;

if tail.seq_num > 0 {
    let input = AppendInput::new(
        AppendRecordBatch::try_from_iter([
            CommandRecord::trim(tail.seq_num - 1).into()
        ])?
    );
    stream.append(input).await?;
    println!("Trim requested");
}

Delete a Stream

use s2_sdk::types::DeleteStreamInput;

basin.delete_stream(DeleteStreamInput::new(stream_name)).await?;

Access Tokens

Issue a Token

use s2_sdk::types::{IssueAccessTokenInput, AccessTokenDescription};

let description = AccessTokenDescription::try_from("my-token")?;
let token = s2.issue_access_token(
    IssueAccessTokenInput::new().with_description(description)
).await?;

println!("Token: {}", token);

List Tokens

use s2_sdk::types::ListAccessTokensInput;

let page = s2.list_access_tokens(ListAccessTokensInput::new()).await?;
for info in page.values {
    println!("Token ID: {}", info.id);
}

Revoke a Token

use s2_sdk::types::AccessTokenId;

let token_id: AccessTokenId = "token-id".parse()?;
s2.revoke_access_token(token_id).await?;

Metrics

Account Metrics

use s2_sdk::types::{GetAccountMetricsInput, MetricName};

let metrics = s2.get_account_metrics(
    GetAccountMetricsInput::new()
        .with_name(MetricName::RecordBytes)
).await?;

Basin Metrics

use s2_sdk::types::GetBasinMetricsInput;

let metrics = s2.get_basin_metrics(
    GetBasinMetricsInput::new(basin_name)
        .with_name(MetricName::AppendCount)
).await?;

Stream Metrics

use s2_sdk::types::GetStreamMetricsInput;

let metrics = s2.get_stream_metrics(
    GetStreamMetricsInput::new(basin_name, stream_name)
        .with_name(MetricName::ReadCount)
).await?;

Error Handling

All SDK operations return Result<T, S2Error>:
use s2_sdk::types::S2Error;

match s2.list_basins(ListBasinsInput::new()).await {
    Ok(page) => println!("Basins: {:?}", page.values),
    Err(S2Error::Unauthorized { .. }) => {
        eprintln!("Invalid access token");
    }
    Err(S2Error::NotFound { .. }) => {
        eprintln!("Resource not found");
    }
    Err(e) => {
        eprintln!("Error: {}", e);
    }
}

Configuration

Custom Endpoint

let config = S2Config::new("token")
    .with_base_url("https://custom.s2.dev".parse()?);
let s2 = S2::new(config)?;

Retry Configuration

use s2_sdk::types::{RetryConfig, AppendRetryPolicy};

let config = S2Config::new("token")
    .with_retry(RetryConfig::new().with_append_retry_policy(
        AppendRetryPolicy::SafeToRetry
    ));

API Structure

The SDK is organized into three main types:
  • S2 - Account-level operations (basins, tokens, metrics)
  • S2Basin - Basin-level operations (streams, configuration)
  • S2Stream - Stream-level operations (append, read, trim)
// Account level
let s2 = S2::new(config)?;

// Basin level
let basin = s2.basin(basin_name);

// Stream level
let stream = basin.stream(stream_name);

Examples

The SDK includes comprehensive examples in the repository:
  • producer.rs - Producer pattern
  • consumer.rs - Consumer pattern
  • create_basin.rs - Basin creation
  • create_stream.rs - Stream creation
  • explicit_trim.rs - Stream trimming
  • list_all_basins.rs - Pagination example
  • And many more…
Run examples:
export S2_ACCESS_TOKEN="your_token"
export S2_BASIN="your_basin"
export S2_STREAM="your_stream"
cargo run --example producer

Resources

Crates.io

View package on Crates.io

Docs.rs

API reference documentation

Examples

Browse code examples

GitHub

Source code and issues

Next Steps

Build docs developers (and LLMs) love