The Python SDK is currently in beta. Visit github.com/kuestcom for the latest version.
Installation
pip install kuest-sdk
Quick start
from kuest import KuestClient
# Initialize client
client = KuestClient(
api_key="your-api-key",
api_secret="your-api-secret",
passphrase="your-passphrase",
testnet=True, # Use Polygon Amoy for testing
)
# Get all markets
markets = client.get_markets()
print(f"Found {len(markets)} markets")
# Place a limit order
order = client.place_order(
market_id="0x123...",
outcome="YES",
side="BUY",
price=0.65,
size=10.0,
)
print(f"Order placed: {order.id}")
Authentication
Get API credentials
- Visit auth.kuest.com
- Connect your Polygon wallet
- Generate API key, secret, and passphrase
- Store credentials securely
Environment variables
import os
from kuest import KuestClient
client = KuestClient(
api_key=os.getenv("KUEST_API_KEY"),
api_secret=os.getenv("KUEST_API_SECRET"),
passphrase=os.getenv("KUEST_PASSPHRASE"),
testnet=os.getenv("KUEST_TESTNET", "true") == "true",
)
Core features
Market data
# Get all active markets
markets = client.get_markets(status="active")
for market in markets:
print(f"{market.title}: {market.yes_price:.2%}")
# Get specific market
market = client.get_market("0x123...")
print(f"Volume: ${market.volume:,.2f}")
# Get orderbook
orderbook = client.get_orderbook("0x123...")
print(f"Best bid: {orderbook.best_bid}")
print(f"Best ask: {orderbook.best_ask}")
print(f"Spread: {orderbook.spread:.4f}")
Place orders
# Limit order
order = client.place_order(
market_id="0x123...",
outcome="YES",
side="BUY",
price=0.65,
size=10.0,
order_type="LIMIT",
)
# Market order (fills immediately at best price)
order = client.place_order(
market_id="0x123...",
outcome="NO",
side="SELL",
size=5.0,
order_type="MARKET",
)
Manage orders
# Get open orders
orders = client.get_orders(market_id="0x123...")
for order in orders:
print(f"{order.side} {order.size} @ {order.price}")
# Cancel order
client.cancel_order(order_id="abc123")
# Cancel all orders for a market
client.cancel_all_orders(market_id="0x123...")
Portfolio management
# Get current positions
positions = client.get_positions()
for position in positions:
print(f"{position.market}: {position.size} @ {position.avg_price}")
print(f"Current value: ${position.current_value:.2f}")
print(f"PnL: ${position.pnl:.2f} ({position.pnl_percent:.2%})")
# Get trading history
trades = client.get_trades(limit=100)
for trade in trades:
print(f"{trade.timestamp}: {trade.side} {trade.size} @ {trade.price}")
Advanced features
WebSocket streaming
import asyncio
async def main():
# Subscribe to price updates
async def on_price_update(market_id, price):
print(f"{market_id}: {price}")
await client.subscribe_prices(
market_ids=["0x123...", "0x456..."],
callback=on_price_update,
)
asyncio.run(main())
Batch operations
# Place multiple orders at once
orders = [
{"market_id": "0x123...", "side": "BUY", "price": 0.60, "size": 5},
{"market_id": "0x456...", "side": "SELL", "price": 0.40, "size": 10},
{"market_id": "0x789...", "side": "BUY", "price": 0.75, "size": 3},
]
results = client.place_orders_batch(orders)
print(f"Placed {len(results)} orders")
Error handling
from kuest.exceptions import (
InsufficientBalanceError,
OrderNotFoundError,
RateLimitError,
)
try:
order = client.place_order(
market_id="0x123...",
side="BUY",
price=0.65,
size=1000, # Too large
)
except InsufficientBalanceError:
print("Not enough USDC")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except Exception as e:
print(f"Error: {e}")
Example strategies
Market making bot
import time
def market_make(market_id, spread=0.02, size=5.0):
"""Place buy and sell orders around mid-price"""
orderbook = client.get_orderbook(market_id)
mid_price = (orderbook.best_bid + orderbook.best_ask) / 2
# Place buy order
client.place_order(
market_id=market_id,
side="BUY",
price=mid_price - spread / 2,
size=size,
)
# Place sell order
client.place_order(
market_id=market_id,
side="SELL",
price=mid_price + spread / 2,
size=size,
)
print(f"Market making at {mid_price:.4f} +/- {spread/2:.4f}")
# Run market maker
while True:
try:
market_make("0x123...", spread=0.02, size=5.0)
time.sleep(60) # Update every minute
except KeyboardInterrupt:
break
except Exception as e:
print(f"Error: {e}")
time.sleep(10)
Arbitrage detector
def check_arbitrage():
"""Find arbitrage opportunities across markets"""
markets = client.get_markets(status="active")
for market in markets:
orderbook = client.get_orderbook(market.id)
# YES + NO should equal 1.0
yes_ask = orderbook.asks[0].price if orderbook.asks else 1.0
no_ask = orderbook.asks[0].price if orderbook.asks else 1.0
total = yes_ask + no_ask
if total < 0.98: # Arbitrage opportunity
profit = 1.0 - total
print(f"Arbitrage in {market.title}:")
print(f" Buy YES @ {yes_ask:.4f}")
print(f" Buy NO @ {no_ask:.4f}")
print(f" Profit: {profit:.4f} USDC")
# Execute arbitrage
client.place_order(market.id, "YES", "BUY", yes_ask, 1.0)
client.place_order(market.id, "NO", "BUY", no_ask, 1.0)
# Run arbitrage detector
while True:
check_arbitrage()
time.sleep(30)
Momentum trading
import pandas as pd
import numpy as np
def momentum_strategy(market_id, lookback=60, threshold=0.01):
"""Trade based on recent price momentum"""
# Fetch price history
prices = client.get_price_history(
market_id=market_id,
interval="1m",
limit=lookback,
)
df = pd.DataFrame(prices)
df['returns'] = df['price'].pct_change()
# Calculate momentum
momentum = df['returns'].tail(10).mean()
print(f"Momentum: {momentum:.4f}")
if momentum > threshold:
print("Strong upward momentum - buying")
client.place_order(
market_id=market_id,
side="BUY",
price=df['price'].iloc[-1] * 1.01, # 1% above current
size=10.0,
)
elif momentum < -threshold:
print("Strong downward momentum - selling")
client.place_order(
market_id=market_id,
side="SELL",
price=df['price'].iloc[-1] * 0.99, # 1% below current
size=10.0,
)
# Run momentum strategy
while True:
momentum_strategy("0x123...", lookback=60, threshold=0.01)
time.sleep(300) # Every 5 minutes
Best practices
Implement risk management
MAX_POSITION_SIZE = 100 # USDC
MAX_DAILY_LOSS = 50 # USDC
def check_risk_limits():
positions = client.get_positions()
total_exposure = sum(p.size * p.avg_price for p in positions)
if total_exposure > MAX_POSITION_SIZE:
raise Exception("Position limit exceeded")
Handle rate limits
import time
from functools import wraps
def retry_on_rate_limit(max_retries=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
time.sleep(e.retry_after)
return wrapper
return decorator
Log all trades
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('trading.log'),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
def place_order_with_logging(**kwargs):
logger.info(f"Placing order: {kwargs}")
order = client.place_order(**kwargs)
logger.info(f"Order placed: {order.id}")
return order
Performance tips
- Use
asynciofor concurrent API calls - Cache orderbook data for 1-2 seconds
- Batch order placements when possible
- Use WebSocket for real-time price updates
- Implement connection pooling
Next steps
Rust SDK
High-performance Rust SDK
Arbitrage
Advanced arbitrage strategies
API reference
Complete API documentation
Bot SDKs
SDK overview and comparison