Overview
The pricing framework enables you to compute fair value prices by combining multiple data sources and indicators. Instead of simply using the mid-price of a single exchange, you can:
Incorporate prices from correlated assets (spot vs futures, different exchanges)
Use technical indicators (order book imbalance, APT factors, basis)
Account for funding rates and carry costs
Leverage lead-lag relationships between markets
Build multi-asset pricing models
This framework is particularly powerful for market making, where accurate pricing determines your profitability.
Key Insight : In HFT, pricing models trained on one exchange often transfer well to other exchanges due to strong cross-venue correlations and lead-lag effects. Similarly, models trained on BTC can be applied to other crypto assets.
Pricing Equation
The fundamental pricing equation for market making:
reservation_price = fair_value + forecast - risk_adjustment
quote_bid = reservation_price - half_spread
quote_ask = reservation_price + half_spread
where:
fair_value: Base price (mid, underlying + basis, etc.)
forecast: Directional alpha (short - term price prediction)
risk_adjustment: Inventory risk skew (penalize large positions)
half_spread: Profit margin
Each component can be sophisticated:
# Fair value from multiple sources
fair_value = (
w1 * binance_mid +
w2 * (binance_spot + futures_basis) +
w3 * bybit_mid +
w4 * okx_mid
)
# Forecast from indicators
forecast = (
alpha_1 * order_book_imbalance +
alpha_2 * trade_flow_imbalance +
alpha_3 * basis_change +
alpha_4 * funding_rate
)
# Risk adjustment
risk_adjustment = risk_aversion * position * volatility
Data Preparation
Multi-Source Data Collection
Collect synchronized data from multiple sources:
import polars as pl
import numpy as np
import datetime
def load_price ( date , market , symbol_list ):
"""
Load and resample price data at fixed interval
Args:
date: Date string 'YYYY-MM-DD'
market: 'futures' or 'spot'
symbol_list: List of symbols to load
Returns:
(timestamps, price_dataframe)
"""
start = datetime.datetime.strptime( str (date), '%Y-%m- %d ' ).replace(
tzinfo = datetime.timezone.utc
)
end = start + datetime.timedelta( days = 1 )
# Define running interval (100ms = 0.1s)
running_interval = 100_000_000 # nanoseconds
start_ts = int (start.timestamp() * 1_000_000_000 ) + running_interval
end_ts = int (end.timestamp() * 1_000_000_000 )
# Create timestamp grid
resample_ts = pl.Series(
'local_timestamp' ,
np.arange(start_ts, end_ts + running_interval, running_interval)
).cast(pl.Datetime( 'ns' )).cast(pl.Datetime( 'us' ))
prices = []
for symbol in symbol_list:
# Load book ticker data
df = (
pl.read_csv( f ' { market } / { symbol } _book_ticker_ { date.strftime( "%Y%m %d " ) } .csv.gz' )
.with_columns(pl.col( 'local_timestamp' ).cast(pl.Datetime))
.group_by_dynamic(
index_column = 'local_timestamp' ,
every = '100ms' ,
period = '100ms' ,
offset = '0s' ,
closed = 'right' ,
label = 'right'
)
.agg(
((pl.col( 'bid_price' ) + pl.col( 'ask_price' )) / 2.0 )
.last()
.alias( 'mid_px' )
)
)
# Resample to uniform grid
df_resample = (
resample_ts.to_frame()
.join(df, on = 'local_timestamp' , how = 'left' )
.fill_null( strategy = 'forward' ) # Forward fill missing values
)
prices.append(df_resample.select(pl.col( 'mid_px' ).alias(symbol)))
return resample_ts, pl.concat(prices, how = 'horizontal' )
# Example: Load data
date = '2025-08-01'
# Futures data
ts, df_futures = load_price(
date, 'futures' ,
[ 'BTCUSDT' , 'ETHUSDT' , 'SOLUSDT' , 'XRPUSDT' ]
)
# Spot data (for basis calculation)
ts, df_spot = load_price(
date, 'spot' ,
[ 'BTCUSDT' , 'ETHUSDT' , 'SOLUSDT' , 'XRPUSDT' ]
)
Computing Returns and Features
Compute features for your pricing model:
import polars as pl
def compute_features ( df_futures , df_spot , lag_periods = [ 1 , 5 , 10 ]):
"""
Compute pricing features
Args:
df_futures: Futures prices
df_spot: Spot prices
lag_periods: Lookback periods for returns
Returns:
DataFrame with features
"""
features = {}
for col in df_futures.columns:
# Returns at different horizons
for lag in lag_periods:
features[ f ' { col } _ret_ { lag } ' ] = (
df_futures[col].pct_change(lag)
)
# Basis (futures - spot)
if col in df_spot.columns:
features[ f ' { col } _basis' ] = (
df_futures[col] - df_spot[col]
)
# Basis change
features[ f ' { col } _basis_chg' ] = (
features[ f ' { col } _basis' ].diff()
)
return pl.DataFrame(features)
# Compute features
features = compute_features(df_futures, df_spot)
Order Book Imbalance
Compute order book imbalance as a pricing signal:
import polars as pl
def compute_imbalance ( book_ticker_file , depths = [ 0 , 1 , 2 , 3 , 4 ]):
"""
Compute order book imbalance at multiple depth levels
Args:
book_ticker_file: Path to book ticker CSV
depths: Depth levels to compute (0=best, 1=second best, etc.)
Returns:
DataFrame with imbalance features
"""
df = pl.read_csv(book_ticker_file)
imbalances = {}
for d in depths:
bid_col = f 'bid_price' if d == 0 else f 'bid_price_ { d } '
ask_col = f 'ask_price' if d == 0 else f 'ask_price_ { d } '
bid_qty = f 'bid_amount' if d == 0 else f 'bid_amount_ { d } '
ask_qty = f 'ask_amount' if d == 0 else f 'ask_amount_ { d } '
# Imbalance = (bid_qty - ask_qty) / (bid_qty + ask_qty)
imbalances[ f 'imb_ { d } ' ] = (
(pl.col(bid_qty) - pl.col(ask_qty)) /
(pl.col(bid_qty) + pl.col(ask_qty))
)
return df.select([
'local_timestamp' ,
* [pl.col( 'local_timestamp' ).alias(k).apply( lambda x : v)
for k, v in imbalances.items()]
])
Building Pricing Models
Simple Linear Model
Start with a simple linear regression:
import numpy as np
from sklearn.linear_model import LinearRegression
def train_pricing_model ( features , target_returns , train_frac = 0.7 ):
"""
Train linear pricing model
Args:
features: Feature DataFrame
target_returns: Target returns to predict
train_frac: Fraction of data for training
Returns:
Trained model and predictions
"""
# Split into train/test
n = len (features)
split = int (n * train_frac)
X_train = features[:split].to_numpy()
y_train = target_returns[:split].to_numpy()
X_test = features[split:].to_numpy()
y_test = target_returns[split:].to_numpy()
# Remove NaNs
train_valid = ~ np.isnan(X_train).any( axis = 1 ) & ~ np.isnan(y_train)
test_valid = ~ np.isnan(X_test).any( axis = 1 ) & ~ np.isnan(y_test)
X_train = X_train[train_valid]
y_train = y_train[train_valid]
X_test = X_test[test_valid]
y_test = y_test[test_valid]
# Train model
model = LinearRegression()
model.fit(X_train, y_train)
# Predict
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
# Evaluate
train_r2 = model.score(X_train, y_train)
test_r2 = model.score(X_test, y_test)
print ( f "Train R²: { train_r2 :.4f} " )
print ( f "Test R²: { test_r2 :.4f} " )
print ( f " \n Feature Importances:" )
for i, col in enumerate (features.columns):
print ( f " { col } : { model.coef_[i] :.6f} " )
return model, test_pred, y_test
# Example usage
target = df_futures[ 'BTCUSDT' ].pct_change( 1 ) # 1-period ahead return
model, preds, actual = train_pricing_model(features, target)
Cross-Asset Model
Use correlations between assets:
def train_cross_asset_model ( df_prices , target_symbol , feature_symbols ):
"""
Predict target_symbol price using feature_symbols
Args:
df_prices: DataFrame with all prices
target_symbol: Symbol to predict
feature_symbols: Symbols to use as features
Returns:
Model and predictions
"""
# Compute returns for all symbols
returns = {}
for sym in [target_symbol] + feature_symbols:
returns[sym] = df_prices[sym].pct_change( 1 )
df_returns = pl.DataFrame(returns)
# Use other assets to predict target
X = df_returns.select(feature_symbols).to_numpy()
y = df_returns[target_symbol].to_numpy()
# Remove NaNs
valid = ~ np.isnan(X).any( axis = 1 ) & ~ np.isnan(y)
X = X[valid]
y = y[valid]
# Split and train
split = int ( len (X) * 0.7 )
model = LinearRegression()
model.fit(X[:split], y[:split])
# Predict
predictions = model.predict(X)
return model, predictions, y
# Example: Predict ETHUSDT using BTCUSDT and SOLUSDT
model, preds, actual = train_cross_asset_model(
df_futures,
target_symbol = 'ETHUSDT' ,
feature_symbols = [ 'BTCUSDT' , 'SOLUSDT' , 'XRPUSDT' ]
)
Integrating with Backtesting
Precompute Predictions
Generate predictions for the entire backtest period:
import numpy as np
def precompute_forecast ( df_prices , df_features , model , running_interval = 100_000_000 ):
"""
Precompute forecast for backtesting
Args:
df_prices: Price data
df_features: Feature data
model: Trained pricing model
running_interval: Backtest running interval
Returns:
Array of [timestamp, forecast] aligned with backtest
"""
# Get features
X = df_features.to_numpy()
# Generate predictions
# Handle NaNs by using last valid prediction
predictions = np.full( len (X), np.nan)
valid_mask = ~ np.isnan(X).any( axis = 1 )
predictions[valid_mask] = model.predict(X[valid_mask])
# Forward fill NaNs
last_valid = 0.0
for i in range ( len (predictions)):
if np.isnan(predictions[i]):
predictions[i] = last_valid
else :
last_valid = predictions[i]
# Create timestamp array (assuming data is already at running_interval)
timestamps = np.arange( len (predictions)) * running_interval
# Create structured array
forecast = np.zeros( len (predictions), dtype = [
( 'timestamp' , 'i8' ),
( 'forecast' , 'f8' )
])
forecast[ 'timestamp' ] = timestamps
forecast[ 'forecast' ] = predictions
return forecast
# Precompute and save
forecast = precompute_forecast(df_futures, features, model)
np.save( 'forecast_20250801.npy' , forecast)
Use in Strategy
Incorporate the forecast in your trading strategy:
from numba import njit
import numpy as np
@njit
def market_making_with_forecast ( hbt , recorder , forecast_data , alpha ):
"""
Market making strategy with alpha forecast
Args:
hbt: Backtest instance
recorder: Recorder for stats
forecast_data: Precomputed forecast array
alpha: Weight on forecast signal
"""
asset_no = 0
tick_size = hbt.depth(asset_no).tick_size
# Index into forecast data
forecast_idx = 0
while hbt.elapse( 100_000_000 ) == 0 : # 100ms intervals
hbt.clear_inactive_orders(asset_no)
depth = hbt.depth(asset_no)
position = hbt.position(asset_no)
# Get current forecast
current_ts = hbt.current_timestamp
while (forecast_idx < len (forecast_data) - 1 and
forecast_data[forecast_idx + 1 ][ 'timestamp' ] <= current_ts):
forecast_idx += 1
forecast = forecast_data[forecast_idx][ 'forecast' ]
# Compute prices
mid_price = (depth.best_bid + depth.best_ask) / 2.0
# Fair value with forecast
fair_value = mid_price + alpha * forecast
# Risk adjustment
volatility = 10.0 # Simplified; compute from data in practice
risk_aversion = 0.1
risk_adjustment = risk_aversion * position * volatility
reservation_price = fair_value - risk_adjustment
# Half spread
half_spread = tick_size * 2
bid_price = reservation_price - half_spread
ask_price = reservation_price + half_spread
# Round to ticks
bid_price = np.round(bid_price / tick_size) * tick_size
ask_price = np.round(ask_price / tick_size) * tick_size
# Ensure don't cross spread
bid_price = min (bid_price, depth.best_bid)
ask_price = max (ask_price, depth.best_ask)
order_qty = 0.1
max_position = 10
# Submit orders
if position < max_position and np.isfinite(bid_price):
hbt.submit_buy_order(asset_no, 1 , bid_price, order_qty,
GTC , LIMIT , False )
if position > - max_position and np.isfinite(ask_price):
hbt.submit_sell_order(asset_no, 2 , ask_price, order_qty,
GTC , LIMIT , False )
recorder.record(hbt)
return True
Transfer Learning Across Venues
A powerful feature of the pricing framework is model transferability:
# Train on Binance Futures
model = train_pricing_model(
binance_features,
binance_returns
)
# Apply to Bybit (without retraining)
bybit_forecast = model.predict(bybit_features.to_numpy())
# Apply to OKX
okx_forecast = model.predict(okx_features.to_numpy())
# Often outperforms exchange-specific models!
Why Transfer Learning Works :
Strong correlations between exchanges (arbitrage)
Lead-lag relationships (Binance often leads)
Similar market participant behavior
Shared fundamental drivers
Validate by comparing live trading results across venues.
Advanced Topics
Time-Varying Models
Update model weights based on recent performance:
def adaptive_model ( features , returns , window = 1000 ):
"""
Rolling window model that adapts to changing market conditions
"""
predictions = np.zeros( len (features))
for i in range (window, len (features)):
# Train on recent window
X_train = features[i - window:i].to_numpy()
y_train = returns[i - window:i].to_numpy()
valid = ~ np.isnan(X_train).any( axis = 1 ) & ~ np.isnan(y_train)
model = LinearRegression()
model.fit(X_train[valid], y_train[valid])
# Predict next step
predictions[i] = model.predict(features[i:i + 1 ].to_numpy())[ 0 ]
return predictions
Ensemble Models
Combine multiple models:
def ensemble_forecast ( models , features , weights = None ):
"""
Combine predictions from multiple models
"""
if weights is None :
weights = np.ones( len (models)) / len (models)
X = features.to_numpy()
predictions = np.zeros( len (X))
for model, weight in zip (models, weights):
predictions += weight * model.predict(X)
return predictions
Non-Linear Models
Use gradient boosting or neural networks:
from sklearn.ensemble import GradientBoostingRegressor
def train_gbm_model ( features , target_returns ):
"""Train gradient boosting model"""
X = features.to_numpy()
y = target_returns.to_numpy()
valid = ~ np.isnan(X).any( axis = 1 ) & ~ np.isnan(y)
X = X[valid]
y = y[valid]
model = GradientBoostingRegressor(
n_estimators = 100 ,
learning_rate = 0.1 ,
max_depth = 3 ,
random_state = 42
)
split = int ( len (X) * 0.7 )
model.fit(X[:split], y[:split])
return model
Best Practices
Use Out-of-Sample Testing
Always evaluate your pricing model on data it hasn’t seen. Use walk-forward validation or separate test periods to avoid overfitting.
Pricing model performance degrades over time as market dynamics change. Retrain periodically (e.g., monthly) and monitor live performance.
Begin with linear models using a few key features (e.g., spot price, basis, imbalance). Add complexity only when you see clear benefits in out-of-sample testing.
Your pricing model must be fast enough to run within your decision interval. Complex models may be too slow for sub-millisecond strategies.
Next Steps
Data Fusion Learn to combine multiple data streams effectively
Accelerated Backtesting Speed up model evaluation with accelerated backtesting