The Python SDK is currently being migrated to support the S2 v1 API. It is not yet recommended for production use with the current S2 platform.
The Python SDK will provide a Pythonic interface for interacting with S2, with support for async/await and type hints.
Current Status
The Python SDK is undergoing migration to support the S2 v1 API. Once complete, it will provide:
Full v1 API compatibility
Async/await support with asyncio
Type hints and mypy compatibility
Idiomatic Python patterns
Repository
The SDK is available at:
Python SDK Repository View source code and track migration progress
Planned Features
Once migration is complete, the Python SDK will support:
Installation
Basic Usage (Planned)
import asyncio
from s2_sdk import S2
async def main ():
# Initialize client
s2 = S2( access_token = "your_access_token" )
# List basins
basins = await s2.list_basins()
# Create a stream
stream = s2.basin( "my-basin" ).stream( "my-stream" )
# Append records
producer = stream.producer()
ack = await producer.append({ "body" : "Hello, S2!" })
# Read records
async for batch in stream.read():
for record in batch.records:
print ( f "Record: { record.body } " )
asyncio.run(main())
Working with Basins (Planned)
# Create a basin
basin = await s2.create_basin(
name = "my-basin" ,
config = {
"default_stream_config" : {
"retention_policy" : {
"type" : "age" ,
"seconds" : 7 * 24 * 60 * 60 # 7 days
}
}
}
)
# List basins with pagination
async for basin in s2.list_all_basins():
print (basin.name)
# Delete a basin
await s2.delete_basin( "my-basin" )
Writing Records (Planned)
stream = s2.basin( "my-basin" ).stream( "my-stream" )
producer = stream.producer()
# Append single record
ack = await producer.append({ "body" : b "my data" })
print ( f "Sequence number: { ack.seq_num } " )
# Batch append
acks = await producer.append_batch([
{ "body" : b "record 1" },
{ "body" : b "record 2" },
{ "body" : b "record 3" },
])
Reading Records (Planned)
# Read with async iteration
stream = s2.basin( "my-basin" ).stream( "my-stream" )
async for batch in stream.read():
for record in batch.records:
print ( f "SeqNum: { record.seq_num } " )
print ( f "Body: { record.body } " )
# Read from specific position
async for batch in stream.read( start_seq_num = 100 ):
# Process records
pass
# Single read
batch = await stream.read_once( limit = 100 )
Type Hints (Planned)
from typing import AsyncIterator
from s2_sdk import S2, Basin, Stream, Record, ReadBatch
async def process_stream ( stream : Stream) -> None :
batches: AsyncIterator[ReadBatch] = stream.read()
async for batch in batches:
record: Record
for record in batch.records:
# Process with full type safety
pass
Alternative Options
While the Python SDK is being migrated, you can:
Use the REST API directly with libraries like httpx or requests:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(
"https://s2.dev/api/basins" ,
headers = { "Authorization" : f "Bearer { access_token } " }
)
basins = response.json()
Use another SDK if you can integrate it into your stack:
Contributing
We welcome contributions to help complete the Python SDK migration! Check the repository for:
Open issues and migration tasks
Contribution guidelines
Development setup instructions
GitHub Repository Contribute to the migration
REST API Reference Use the API directly
Discord Community Discuss SDK development
GitHub Issues Report issues or request features
Stay Updated
To get notified when the Python SDK migration is complete:
Next Steps
While waiting for the Python SDK: