This guide covers how to use WebSocket streams for real-time orderbook updates, trade notifications, and quick market events.
Overview
Turbine’s WebSocket API provides real-time market data with lower latency than polling REST endpoints. This is essential for:
Market making bots that need instant orderbook updates
Trade monitoring to track fills in real-time
Quick market tracking to detect new 15-minute markets immediately
Basic Connection
The WebSocket client requires Python’s async/await syntax:
import asyncio
from turbine_client import TurbineWSClient
async def main ():
# Create WebSocket client
ws = TurbineWSClient( host = "https://api.turbinefi.com" )
async with ws.connect() as stream:
# Subscribe to a market
await stream.subscribe_orderbook( market_id = "0x1234..." )
# Process incoming messages
async for message in stream:
print ( f "Received: { message.type } " )
print (message.data)
if __name__ == "__main__" :
asyncio.run(main())
WebSocket connections automatically reconnect on disconnection. The client handles ping/pong keepalive messages internally.
Subscription Methods
Subscribe to Orderbook Updates
Get real-time orderbook snapshots whenever bids or asks change:
await stream.subscribe_orderbook( market_id = "0x1234..." )
Subscribe to Trade Updates
Receive notifications for every trade execution:
await stream.subscribe_trades( market_id = "0x1234..." )
Subscribe to Quick Markets
Get notified when new 15-minute markets are created:
# Subscribe to BTC quick markets
await stream.subscribe_quick_markets( "BTC" )
# Subscribe to ETH quick markets
await stream.subscribe_quick_markets( "ETH" )
# Subscribe to SOL quick markets
await stream.subscribe_quick_markets( "SOL" )
Multiple Subscriptions
You can subscribe to multiple channels on the same connection:
async with ws.connect() as stream:
# Subscribe to multiple channels
await stream.subscribe_orderbook(market_id)
await stream.subscribe_trades(market_id)
await stream.subscribe_quick_markets( "BTC" )
# Process all messages
async for message in stream:
handle_message(message)
Message Types
Orderbook Messages
Orderbook updates contain the full bid and ask ladder:
async for message in stream:
if message.type == "orderbook" :
ob = message.orderbook
print ( f "Last update: { ob.last_update } " )
# Best bid/ask
if ob.bids:
best_bid = ob.bids[ 0 ].price / 10000
bid_size = ob.bids[ 0 ].size / 1_000_000
print ( f "Best bid: { best_bid :.2f} % x { bid_size :.2f} " )
if ob.asks:
best_ask = ob.asks[ 0 ].price / 10000
ask_size = ob.asks[ 0 ].size / 1_000_000
print ( f "Best ask: { best_ask :.2f} % x { ask_size :.2f} " )
Message structure:
{
"type" : "orderbook" ,
"marketId" : "0x1234..." ,
"data" : {
"bids" : [
{ "price" : 500000 , "size" : 1000000 },
{ "price" : 490000 , "size" : 2000000 }
],
"asks" : [
{ "price" : 510000 , "size" : 1500000 },
{ "price" : 520000 , "size" : 3000000 }
],
"lastUpdate" : 1705000000
}
}
Trade Messages
Trade notifications include full execution details:
async for message in stream:
if message.type == "trade" :
trade = message.trade
price_pct = trade.price / 10000
shares = trade.size / 1_000_000
side = "BUY" if trade.side == 0 else "SELL"
outcome = "YES" if trade.outcome == 0 else "NO"
print ( f "Trade: { side } { shares :.2f} { outcome } @ { price_pct :.2f} %" )
print ( f " Maker: { trade.maker[: 10 ] } ..." )
print ( f " Taker: { trade.taker[: 10 ] } ..." )
Message structure:
{
"type" : "trade" ,
"marketId" : "0x1234..." ,
"data" : {
"price" : 505000 ,
"size" : 5000000 ,
"outcome" : 0 ,
"side" : 0 ,
"maker" : "0xabc..." ,
"taker" : "0xdef..." ,
"timestamp" : 1705000000 ,
"txHash" : "0x123..."
}
}
Quick Market Messages
Notifications when new 15-minute markets are created:
async for message in stream:
if message.type == "quick_market" :
qm = message.quick_market
price = qm.start_price / 1e6
status = "RESOLVED" if qm.resolved else "ACTIVE"
print ( f "Quick Market Update:" )
print ( f " Asset: { qm.asset } " )
print ( f " Strike: $ { price :,.2f} " )
print ( f " Status: { status } " )
print ( f " Market ID: { qm.market_id } " )
Message structure:
{
"type" : "quick_market" ,
"data" : {
"asset" : "BTC" ,
"marketId" : "0x5678..." ,
"startPrice" : 97250000000 ,
"endTime" : 1705001700 ,
"resolved" : false
}
}
Complete WebSocket Example
Here’s a complete example that subscribes to all channels and processes messages:
import asyncio
import time
from turbine_client import TurbineClient, TurbineWSClient
from turbine_client.exceptions import WebSocketError
async def monitor_market ():
"""Monitor a market via WebSocket."""
# First, get the current market using REST API
client = TurbineClient(
host = "https://api.turbinefi.com" ,
chain_id = 137 ,
)
market = client.get_quick_market( "BTC" )
print ( f "Monitoring: { market.question } " )
print ( f "Market ID: { market.market_id } \n " )
client.close()
# Connect to WebSocket
ws = TurbineWSClient( host = "https://api.turbinefi.com" )
try :
async with ws.connect() as stream:
# Subscribe to all channels for this market
await stream.subscribe_orderbook(market.market_id)
await stream.subscribe_trades(market.market_id)
await stream.subscribe_quick_markets( "BTC" )
print ( "Connected! Waiting for messages... \n " )
# Process messages
async for message in stream:
timestamp = time.strftime( '%H:%M:%S' )
if message.type == "orderbook" :
if hasattr (message, "orderbook" ) and message.orderbook:
ob = message.orderbook
if ob.bids and ob.asks:
best_bid = ob.bids[ 0 ].price / 10000
best_ask = ob.asks[ 0 ].price / 10000
spread = best_ask - best_bid
print ( f "[ { timestamp } ] ORDERBOOK" )
print ( f " Bid: { best_bid :.2f} % | Ask: { best_ask :.2f} % | Spread: { spread :.2f} %" )
elif message.type == "trade" :
if hasattr (message, "trade" ) and message.trade:
trade = message.trade
price_pct = trade.price / 10000
shares = trade.size / 1_000_000
side = "BUY" if trade.side == 0 else "SELL"
outcome = "YES" if trade.outcome == 0 else "NO"
print ( f "[ { timestamp } ] TRADE" )
print ( f " { side } { shares :.2f} { outcome } @ { price_pct :.2f} %" )
elif message.type == "quick_market" :
if hasattr (message, "quick_market" ) and message.quick_market:
qm = message.quick_market
price = qm.start_price / 1e6
status = "RESOLVED" if qm.resolved else "ACTIVE"
print ( f "[ { timestamp } ] QUICK MARKET" )
print ( f " { qm.asset } Strike: $ { price :,.2f} | { status } " )
print ( f " New Market ID: { qm.market_id } " )
# Update subscription to new market
if not qm.resolved:
await stream.subscribe_orderbook(qm.market_id)
await stream.subscribe_trades(qm.market_id)
else :
print ( f "[ { timestamp } ] { message.type.upper() } : { message.data } " )
except WebSocketError as e:
print ( f "WebSocket error: { e } " )
except KeyboardInterrupt :
print ( " \n Disconnected" )
if __name__ == "__main__" :
asyncio.run(monitor_market())
See the complete implementation in examples/websocket_stream.py .
Using WebSocket in Trading Bots
Market Maker with WebSocket
Market makers need real-time orderbook updates to adjust quotes:
import asyncio
from turbine_client import TurbineClient, TurbineWSClient, Outcome
class WebSocketMarketMaker :
def __init__ ( self , client : TurbineClient, market_id : str ):
self .client = client
self .market_id = market_id
self .spread = 20000 # 2% spread
async def run ( self ):
ws = TurbineWSClient( host = "https://api.turbinefi.com" )
async with ws.connect() as stream:
# Subscribe to orderbook
await stream.subscribe_orderbook( self .market_id)
async for message in stream:
if message.type == "orderbook" :
await self .update_quotes(message.orderbook)
async def update_quotes ( self , orderbook ):
"""Adjust quotes based on orderbook."""
if not orderbook.bids or not orderbook.asks:
return
# Calculate mid price
best_bid = orderbook.bids[ 0 ].price
best_ask = orderbook.asks[ 0 ].price
mid = (best_bid + best_ask) // 2
# Cancel old orders
self .client.cancel_market_orders( self .market_id)
# Place new orders around mid
buy_order = self .client.create_limit_buy(
market_id = self .market_id,
outcome = Outcome. YES ,
price = mid - self .spread // 2 ,
size = 1_000_000 ,
)
sell_order = self .client.create_limit_sell(
market_id = self .market_id,
outcome = Outcome. YES ,
price = mid + self .spread // 2 ,
size = 1_000_000 ,
)
self .client.post_order(buy_order)
self .client.post_order(sell_order)
print ( f "Updated quotes: { (mid - self .spread // 2 ) / 10000 :.2f} % - { (mid + self .spread // 2 ) / 10000 :.2f} %" )
Trade Notification Handler
Detect when your orders are filled:
class TradeNotifier :
def __init__ ( self , client : TurbineClient, market_id : str ):
self .client = client
self .market_id = market_id
self .my_address = client.address
async def monitor_trades ( self ):
ws = TurbineWSClient( host = "https://api.turbinefi.com" )
async with ws.connect() as stream:
await stream.subscribe_trades( self .market_id)
async for message in stream:
if message.type == "trade" :
trade = message.trade
# Check if we were involved
if trade.maker == self .my_address:
print ( f "Our order filled as MAKER" )
await self .on_fill(trade, "maker" )
elif trade.taker == self .my_address:
print ( f "Our order filled as TAKER" )
await self .on_fill(trade, "taker" )
async def on_fill ( self , trade , role : str ):
"""Handle order fill."""
price_pct = trade.price / 10000
shares = trade.size / 1_000_000
print ( f "Filled: { shares :.2f} shares @ { price_pct :.2f} % ( { role } )" )
# Update position tracking, risk management, etc.
# ...
Error Handling
Automatic Reconnection
The WebSocket client automatically reconnects on disconnection:
async def resilient_websocket ():
ws = TurbineWSClient( host = "https://api.turbinefi.com" )
while True :
try :
async with ws.connect() as stream:
await stream.subscribe_orderbook(market_id)
async for message in stream:
process_message(message)
except WebSocketError as e:
print ( f "WebSocket error: { e } " )
print ( "Reconnecting in 5 seconds..." )
await asyncio.sleep( 5 )
except Exception as e:
print ( f "Unexpected error: { e } " )
await asyncio.sleep( 5 )
Connection Timeout
Set a custom connection timeout:
ws = TurbineWSClient(
host = "https://api.turbinefi.com" ,
timeout = 30 # 30 seconds
)
1. Process Messages Asynchronously
Don’t block the message loop with slow operations:
async def process_orderbook ( orderbook ):
"""Heavy processing in separate task."""
# Do expensive calculations here
await asyncio.sleep( 0 ) # Yield control
async for message in stream:
if message.type == "orderbook" :
# Don't await - process in background
asyncio.create_task(process_orderbook(message.orderbook))
2. Batch API Calls
Avoid making API calls for every message:
class BatchedBot :
def __init__ ( self ):
self .pending_actions = []
async def run ( self ):
# Start background task to process actions
asyncio.create_task( self .process_actions())
async for message in stream:
# Queue actions instead of executing immediately
action = self .calculate_action(message)
self .pending_actions.append(action)
async def process_actions ( self ):
"""Process queued actions in batches."""
while True :
await asyncio.sleep( 1 ) # Batch every 1 second
if self .pending_actions:
actions = self .pending_actions[:]
self .pending_actions.clear()
# Execute batch
for action in actions:
await self .execute(action)
3. Filter Redundant Updates
Ignore orderbook updates with no meaningful change:
class SmartOrderbookHandler :
def __init__ ( self ):
self .last_best_bid = 0
self .last_best_ask = 0
self .threshold = 5000 # 0.5% change threshold
async def handle_orderbook ( self , orderbook ):
if not orderbook.bids or not orderbook.asks:
return
best_bid = orderbook.bids[ 0 ].price
best_ask = orderbook.asks[ 0 ].price
# Check if significant change
bid_change = abs (best_bid - self .last_best_bid)
ask_change = abs (best_ask - self .last_best_ask)
if bid_change > self .threshold or ask_change > self .threshold:
self .last_best_bid = best_bid
self .last_best_ask = best_ask
# Process meaningful update
await self .update_strategy(best_bid, best_ask)
Next Steps
Build a Trading Bot Use WebSocket streams in your bot
Market Data REST API for market data
API Reference Complete WebSocket API docs
Example Code See the full example on GitHub