Overview
The TurbineWSClient class provides real-time streaming of market data via WebSocket. It supports:
- Real-time orderbook updates
- Trade executions
- Quick market updates
- Automatic reconnection on disconnect
- Market-level subscriptions
All WebSocket connections are to wss://api.turbinefi.com/api/v1/ws.
Constructor
from turbine_client import TurbineWSClient
ws_client = TurbineWSClient(
host="https://api.turbinefi.com",
reconnect=True,
reconnect_delay=1.0,
max_reconnect_delay=60.0
)
The WebSocket host URL. Can be http://, https://, ws://, or wss://. HTTP(S) URLs are automatically converted to WS(S).
Whether to automatically reconnect on disconnect.
Initial reconnect delay in seconds.
Maximum reconnect delay in seconds (uses exponential backoff).
Properties
url
ws_client.url # Returns: str
Get the full WebSocket URL.
Example:
print(ws_client.url) # wss://api.turbinefi.com/api/v1/ws
Connection Management
connect
async with ws_client.connect() as stream:
# Use stream to subscribe and receive messages
Connect to the WebSocket server as an async context manager. Automatically closes the connection when the context exits.
A WSStream object for subscribing and receiving messages
Example:
import asyncio
from turbine_client import TurbineWSClient
async def main():
ws_client = TurbineWSClient("https://api.turbinefi.com")
async with ws_client.connect() as stream:
await stream.subscribe("0x1234...market_id")
async for message in stream:
if message.type == "trade":
print(f"Trade: {message.data}")
elif message.type == "orderbook":
print(f"Orderbook update")
asyncio.run(main())
connect_with_retry
stream = await ws_client.connect_with_retry()
Connect with automatic reconnection. This method will keep trying to connect indefinitely.
A WSStream for sending/receiving messages
Note: This method returns a stream directly, not via context manager. You must manually close it.
Example:
async def main():
ws_client = TurbineWSClient("https://api.turbinefi.com")
stream = await ws_client.connect_with_retry()
try:
await stream.subscribe(market_id)
async for message in stream:
print(message)
finally:
await stream.close()
asyncio.run(main())
close
Manually close the WebSocket connection.
WSStream
The WSStream class represents an active WebSocket connection and handles subscriptions and message receiving.
subscribe
await stream.subscribe(market_id)
Subscribe to a market to receive all updates (orderbook, trades, order cancellations).
The market ID to subscribe to (hex string like "0x1234...")
Important: The Turbine WebSocket API uses market-level subscriptions. Once subscribed to a market, you will receive ALL updates for that market including:
- Orderbook updates (
type="orderbook")
- Trade updates (
type="trade")
- Order cancelled updates (
type="order_cancelled")
Example:
from turbine_client import TurbineClient, TurbineWSClient
import asyncio
async def main():
# Get current market
client = TurbineClient("https://api.turbinefi.com", chain_id=137)
market = client.get_quick_market("BTC")
# Subscribe to WebSocket
ws_client = TurbineWSClient("https://api.turbinefi.com")
async with ws_client.connect() as stream:
await stream.subscribe(market.market_id)
print(f"Subscribed to market {market.market_id}")
async for message in stream:
print(f"Received: {message.type}")
asyncio.run(main())
unsubscribe
await stream.unsubscribe(market_id)
Unsubscribe from a market.
The market ID to unsubscribe from
Example:
await stream.unsubscribe("0x1234...")
Receiving Messages
There are two ways to receive messages from the stream:
1. Async Iterator (Recommended)
async for message in stream:
# Process message
Iterate over messages as they arrive. This is the cleanest and most Pythonic approach.
Example:
async with ws_client.connect() as stream:
await stream.subscribe(market_id)
async for message in stream:
if message.type == "trade":
trade_data = message.data
print(f"Price: {trade_data['price']}, Size: {trade_data['size']}")
elif message.type == "orderbook":
print("Orderbook updated")
2. Manual recv()
messages = await stream.recv()
Receive messages from a single WebSocket frame. Returns a list because the server may batch multiple messages.
List of messages (may contain multiple if server batches them)
Example:
while True:
messages = await stream.recv()
for message in messages:
print(message.type)
close
Close the stream.
Message Types
The WebSocket API sends different message types:
OrderBookUpdate
Received when the orderbook changes.
if message.type == "orderbook":
market_id = message.market_id
data = message.data # Orderbook data
Fields:
type (str): "orderbook"
market_id (str): The market identifier
data (dict): Orderbook update data with bids and asks
Example:
async for message in stream:
if message.type == "orderbook":
print(f"Orderbook update for {message.market_id}")
print(f"Bids: {message.data.get('bids', [])}")
print(f"Asks: {message.data.get('asks', [])}")
TradeUpdate
Received when a trade executes.
if message.type == "trade":
market_id = message.market_id
trade_data = message.data
Fields:
type (str): "trade"
market_id (str): The market identifier
data (dict): Trade data including:
price (int): Trade price (6 decimals)
size (int): Trade size (6 decimals)
buyer (str): Buyer address
seller (str): Seller address
outcome (int): Outcome (0=YES, 1=NO)
timestamp (int): Unix timestamp
Example:
async for message in stream:
if message.type == "trade":
trade = message.data
price = trade['price'] / 1e6
size = trade['size'] / 1e6
outcome = "YES" if trade['outcome'] == 0 else "NO"
print(f"Trade: {outcome} @ {price:.2%} for {size:.2f} shares")
QuickMarketUpdate
Received when a quick market changes (e.g., new 15-minute market opens).
if message.type == "quick_market":
market_id = message.market_id
market_data = message.data
Fields:
type (str): "quick_market"
market_id (str): The new market identifier
data (dict): Market data including:
asset (str): Asset symbol (e.g., “BTC”)
start_price (int): Strike price (6 decimals)
start_time (int): Market start timestamp
end_time (int): Market end timestamp
Example:
async for message in stream:
if message.type == "quick_market":
market = message.data
strike = market['start_price'] / 1e6
print(f"New BTC market: Strike ${strike:,.2f}")
Complete Trading Bot Example
Here’s a complete example of a trading bot that uses both HTTP and WebSocket clients:
import asyncio
from turbine_client import TurbineClient, TurbineWSClient, Outcome
async def main():
# Initialize HTTP client for API calls
client = TurbineClient(
host="https://api.turbinefi.com",
chain_id=137,
private_key="0x...",
api_key_id="your_key_id",
api_private_key="your_api_key"
)
# Get current BTC market
market = client.get_quick_market("BTC")
market_id = market.market_id
strike_price = market.start_price / 1e6
print(f"Trading BTC market with strike ${strike_price:,.2f}")
# Initialize WebSocket client for real-time data
ws_client = TurbineWSClient("https://api.turbinefi.com")
async with ws_client.connect() as stream:
# Subscribe to the current market
await stream.subscribe(market_id)
print(f"Subscribed to market {market_id}")
# Track state
last_trade_price = None
async for message in stream:
if message.type == "trade":
# Extract trade data
trade = message.data
price = trade['price'] / 1e6
size = trade['size'] / 1e6
outcome = "YES" if trade['outcome'] == 0 else "NO"
print(f"Trade: {outcome} @ {price:.2%} for {size:.2f} shares")
last_trade_price = price
elif message.type == "orderbook":
# Orderbook updated - could analyze spread here
bids = message.data.get('bids', [])
asks = message.data.get('asks', [])
if bids and asks:
best_bid = bids[0]['price'] / 1e6
best_ask = asks[0]['price'] / 1e6
spread = best_ask - best_bid
print(f"Spread: {spread:.2%}")
elif message.type == "quick_market":
# New market opened - cancel old orders and switch
print("New market detected - cancelling old orders")
client.cancel_market_orders(market_id)
# Update to new market
market_id = message.market_id
strike_price = message.data['start_price'] / 1e6
print(f"Switched to new market: Strike ${strike_price:,.2f}")
# Resubscribe
await stream.unsubscribe(market_id)
await stream.subscribe(market_id)
asyncio.run(main())
Reconnection Handling
The WebSocket client handles reconnection automatically when reconnect=True:
ws_client = TurbineWSClient(
host="https://api.turbinefi.com",
reconnect=True,
reconnect_delay=1.0,
max_reconnect_delay=60.0
)
Reconnection behavior:
- Initial delay: 1 second
- Exponential backoff: Doubles on each retry
- Maximum delay: 60 seconds
- Continues indefinitely until connected
Note: After reconnection, you must resubscribe to markets. Subscriptions are not automatically restored.
Example with subscription restoration:
async def stream_with_reconnect():
ws_client = TurbineWSClient("https://api.turbinefi.com")
subscribed_markets = set()
while True:
try:
stream = await ws_client.connect_with_retry()
# Restore subscriptions after reconnect
for market_id in subscribed_markets:
await stream.subscribe(market_id)
print(f"Resubscribed to {market_id}")
# Add initial subscription if none
if not subscribed_markets:
client = TurbineClient("https://api.turbinefi.com", chain_id=137)
market = client.get_quick_market("BTC")
await stream.subscribe(market.market_id)
subscribed_markets.add(market.market_id)
# Process messages
async for message in stream:
print(message.type)
except Exception as e:
print(f"Connection error: {e}")
await asyncio.sleep(1)
asyncio.run(stream_with_reconnect())
Error Handling
The WebSocket client raises WebSocketError for connection issues:
from turbine_client.exceptions import WebSocketError
import asyncio
async def main():
ws_client = TurbineWSClient("https://api.turbinefi.com")
try:
async with ws_client.connect() as stream:
await stream.subscribe(market_id)
async for message in stream:
print(message.type)
except WebSocketError as e:
print(f"WebSocket error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
asyncio.run(main())
Convenience Methods
For backwards compatibility, WSStream provides convenience aliases:
subscribe_orderbook
await stream.subscribe_orderbook(market_id)
Alias for subscribe(). The Turbine API subscribes to markets, not individual channels.
subscribe_trades
await stream.subscribe_trades(market_id)
Alias for subscribe(). The Turbine API subscribes to markets, not individual channels.
Note: Both methods subscribe to the same market feed. There is no separate orderbook vs trades subscription - subscribing to a market gives you both.
Best Practices
1. Use Async Context Manager
Always use the async with pattern to ensure proper cleanup:
async with ws_client.connect() as stream:
# Use stream
pass
# Connection automatically closed
2. Handle Market Transitions
When a new quick market opens, unsubscribe from the old market and subscribe to the new one:
if message.type == "quick_market":
await stream.unsubscribe(old_market_id)
new_market_id = message.market_id
await stream.subscribe(new_market_id)
3. Combine with HTTP Client
Use the HTTP client for initial setup and the WebSocket for real-time updates:
# HTTP: Get initial state
client = TurbineClient(host, chain_id)
market = client.get_quick_market("BTC")
book = client.get_orderbook(market.market_id)
# WebSocket: Stream updates
ws_client = TurbineWSClient(host)
async with ws_client.connect() as stream:
await stream.subscribe(market.market_id)
async for message in stream:
# Process updates
pass
4. Track Subscriptions
Keep track of which markets you’re subscribed to for reconnection:
subscribed_markets = set()
async with ws_client.connect() as stream:
await stream.subscribe(market_id)
subscribed_markets.add(market_id)
# After reconnect, restore subscriptions
for market_id in subscribed_markets:
await stream.subscribe(market_id)
5. Handle Message Batching
The server may send multiple messages in one frame. Always handle lists:
messages = await stream.recv()
for message in messages: # messages is a list
print(message.type)
- Connection pooling: Create one WebSocket client per application, not per market
- Subscription limits: There’s no documented limit, but avoid subscribing to hundreds of markets simultaneously
- Message processing: Process messages quickly to avoid backpressure. For heavy processing, offload to a background task
- Reconnection: The exponential backoff prevents overwhelming the server during outages