Skip to main content
The Kuest Python SDK provides a high-level interface for building trading bots and automated strategies.
The Python SDK is currently in beta. Visit github.com/kuestcom for the latest version.

Installation

pip install kuest-sdk

Quick start

from kuest import KuestClient

# Initialize client
client = KuestClient(
    api_key="your-api-key",
    api_secret="your-api-secret",
    passphrase="your-passphrase",
    testnet=True,  # Use Polygon Amoy for testing
)

# Get all markets
markets = client.get_markets()
print(f"Found {len(markets)} markets")

# Place a limit order
order = client.place_order(
    market_id="0x123...",
    outcome="YES",
    side="BUY",
    price=0.65,
    size=10.0,
)

print(f"Order placed: {order.id}")

Authentication

Get API credentials

  1. Visit auth.kuest.com
  2. Connect your Polygon wallet
  3. Generate API key, secret, and passphrase
  4. Store credentials securely

Environment variables

import os
from kuest import KuestClient

client = KuestClient(
    api_key=os.getenv("KUEST_API_KEY"),
    api_secret=os.getenv("KUEST_API_SECRET"),
    passphrase=os.getenv("KUEST_PASSPHRASE"),
    testnet=os.getenv("KUEST_TESTNET", "true") == "true",
)

Core features

Market data

# Get all active markets
markets = client.get_markets(status="active")

for market in markets:
    print(f"{market.title}: {market.yes_price:.2%}")

# Get specific market
market = client.get_market("0x123...")
print(f"Volume: ${market.volume:,.2f}")

# Get orderbook
orderbook = client.get_orderbook("0x123...")
print(f"Best bid: {orderbook.best_bid}")
print(f"Best ask: {orderbook.best_ask}")
print(f"Spread: {orderbook.spread:.4f}")

Place orders

# Limit order
order = client.place_order(
    market_id="0x123...",
    outcome="YES",
    side="BUY",
    price=0.65,
    size=10.0,
    order_type="LIMIT",
)

# Market order (fills immediately at best price)
order = client.place_order(
    market_id="0x123...",
    outcome="NO",
    side="SELL",
    size=5.0,
    order_type="MARKET",
)

Manage orders

# Get open orders
orders = client.get_orders(market_id="0x123...")

for order in orders:
    print(f"{order.side} {order.size} @ {order.price}")

# Cancel order
client.cancel_order(order_id="abc123")

# Cancel all orders for a market
client.cancel_all_orders(market_id="0x123...")

Portfolio management

# Get current positions
positions = client.get_positions()

for position in positions:
    print(f"{position.market}: {position.size} @ {position.avg_price}")
    print(f"Current value: ${position.current_value:.2f}")
    print(f"PnL: ${position.pnl:.2f} ({position.pnl_percent:.2%})")

# Get trading history
trades = client.get_trades(limit=100)

for trade in trades:
    print(f"{trade.timestamp}: {trade.side} {trade.size} @ {trade.price}")

Advanced features

WebSocket streaming

import asyncio

async def main():
    # Subscribe to price updates
    async def on_price_update(market_id, price):
        print(f"{market_id}: {price}")
    
    await client.subscribe_prices(
        market_ids=["0x123...", "0x456..."],
        callback=on_price_update,
    )

asyncio.run(main())

Batch operations

# Place multiple orders at once
orders = [
    {"market_id": "0x123...", "side": "BUY", "price": 0.60, "size": 5},
    {"market_id": "0x456...", "side": "SELL", "price": 0.40, "size": 10},
    {"market_id": "0x789...", "side": "BUY", "price": 0.75, "size": 3},
]

results = client.place_orders_batch(orders)
print(f"Placed {len(results)} orders")

Error handling

from kuest.exceptions import (
    InsufficientBalanceError,
    OrderNotFoundError,
    RateLimitError,
)

try:
    order = client.place_order(
        market_id="0x123...",
        side="BUY",
        price=0.65,
        size=1000,  # Too large
    )
except InsufficientBalanceError:
    print("Not enough USDC")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")
except Exception as e:
    print(f"Error: {e}")

Example strategies

Market making bot

import time

def market_make(market_id, spread=0.02, size=5.0):
    """Place buy and sell orders around mid-price"""
    orderbook = client.get_orderbook(market_id)
    mid_price = (orderbook.best_bid + orderbook.best_ask) / 2
    
    # Place buy order
    client.place_order(
        market_id=market_id,
        side="BUY",
        price=mid_price - spread / 2,
        size=size,
    )
    
    # Place sell order
    client.place_order(
        market_id=market_id,
        side="SELL",
        price=mid_price + spread / 2,
        size=size,
    )
    
    print(f"Market making at {mid_price:.4f} +/- {spread/2:.4f}")

# Run market maker
while True:
    try:
        market_make("0x123...", spread=0.02, size=5.0)
        time.sleep(60)  # Update every minute
    except KeyboardInterrupt:
        break
    except Exception as e:
        print(f"Error: {e}")
        time.sleep(10)

Arbitrage detector

def check_arbitrage():
    """Find arbitrage opportunities across markets"""
    markets = client.get_markets(status="active")
    
    for market in markets:
        orderbook = client.get_orderbook(market.id)
        
        # YES + NO should equal 1.0
        yes_ask = orderbook.asks[0].price if orderbook.asks else 1.0
        no_ask = orderbook.asks[0].price if orderbook.asks else 1.0
        
        total = yes_ask + no_ask
        
        if total < 0.98:  # Arbitrage opportunity
            profit = 1.0 - total
            print(f"Arbitrage in {market.title}:")
            print(f"  Buy YES @ {yes_ask:.4f}")
            print(f"  Buy NO @ {no_ask:.4f}")
            print(f"  Profit: {profit:.4f} USDC")
            
            # Execute arbitrage
            client.place_order(market.id, "YES", "BUY", yes_ask, 1.0)
            client.place_order(market.id, "NO", "BUY", no_ask, 1.0)

# Run arbitrage detector
while True:
    check_arbitrage()
    time.sleep(30)

Momentum trading

import pandas as pd
import numpy as np

def momentum_strategy(market_id, lookback=60, threshold=0.01):
    """Trade based on recent price momentum"""
    # Fetch price history
    prices = client.get_price_history(
        market_id=market_id,
        interval="1m",
        limit=lookback,
    )
    
    df = pd.DataFrame(prices)
    df['returns'] = df['price'].pct_change()
    
    # Calculate momentum
    momentum = df['returns'].tail(10).mean()
    
    print(f"Momentum: {momentum:.4f}")
    
    if momentum > threshold:
        print("Strong upward momentum - buying")
        client.place_order(
            market_id=market_id,
            side="BUY",
            price=df['price'].iloc[-1] * 1.01,  # 1% above current
            size=10.0,
        )
    elif momentum < -threshold:
        print("Strong downward momentum - selling")
        client.place_order(
            market_id=market_id,
            side="SELL",
            price=df['price'].iloc[-1] * 0.99,  # 1% below current
            size=10.0,
        )

# Run momentum strategy
while True:
    momentum_strategy("0x123...", lookback=60, threshold=0.01)
    time.sleep(300)  # Every 5 minutes

Best practices

1

Test on Amoy first

Always test your bot on Polygon Amoy testnet before going live.
2

Implement risk management

MAX_POSITION_SIZE = 100  # USDC
MAX_DAILY_LOSS = 50      # USDC

def check_risk_limits():
    positions = client.get_positions()
    total_exposure = sum(p.size * p.avg_price for p in positions)
    
    if total_exposure > MAX_POSITION_SIZE:
        raise Exception("Position limit exceeded")
3

Handle rate limits

import time
from functools import wraps

def retry_on_rate_limit(max_retries=3):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except RateLimitError as e:
                    if attempt == max_retries - 1:
                        raise
                    time.sleep(e.retry_after)
            return wrapper
    return decorator
4

Log all trades

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('trading.log'),
        logging.StreamHandler(),
    ],
)

logger = logging.getLogger(__name__)

def place_order_with_logging(**kwargs):
    logger.info(f"Placing order: {kwargs}")
    order = client.place_order(**kwargs)
    logger.info(f"Order placed: {order.id}")
    return order

Performance tips

  • Use asyncio for concurrent API calls
  • Cache orderbook data for 1-2 seconds
  • Batch order placements when possible
  • Use WebSocket for real-time price updates
  • Implement connection pooling

Next steps

Rust SDK

High-performance Rust SDK

Arbitrage

Advanced arbitrage strategies

API reference

Complete API documentation

Bot SDKs

SDK overview and comparison

Build docs developers (and LLMs) love