The data module provides utilities for loading market data, validating event order, and correcting timestamps.
HftBacktest uses a structured NumPy array format for market data. Events must be stored in .npz files or NumPy arrays with the event_dtype structure.
Event Structure
Each event contains:
- Event type and flags (
ev)
- Exchange timestamp (
exch_ts)
- Local receipt timestamp (
local_ts)
- Price (
px)
- Quantity (
qty)
- Order ID for Level-3 events (
order_id)
- Additional integer and float fields (
ival, fval)
See Types for the complete event_dtype definition.
Validation Functions
correct_local_timestamp
Adjusts local timestamps to ensure non-negative feed latency.
from hftbacktest.data import correct_local_timestamp
import numpy as np
data = np.load('market_data.npz')['data']
corrected_data = correct_local_timestamp(data, base_latency=0)
data
NDArray[event_dtype]
required
Market data array to correct (modified in place).
Base latency to add after correction, in the same time unit as the data (typically nanoseconds). Default: 0Due to clock synchronization issues between exchange and local systems, negative latencies may occur. This function:
- Finds the minimum (most negative) feed latency
- Offsets all local timestamps by this amount plus
base_latency
This ensures all latencies are positive and realistic.
The corrected data array (same as input, modified in place).
Example:
import numpy as np
from hftbacktest.data import correct_local_timestamp
from hftbacktest.binding import event_dtype
# Load data
data = np.load('btcusdt_data.npz')['data']
# Add 100 microseconds base latency
base_latency = 100_000 # 100 microseconds in nanoseconds
corrected = correct_local_timestamp(data, base_latency=base_latency)
# Verify latencies are positive
latencies = corrected['local_ts'] - corrected['exch_ts']
assert np.all(latencies >= base_latency)
correct_event_order
Corrects reversed exchange timestamps by splitting events into separate exchange and local events.
from hftbacktest.data import correct_event_order
import numpy as np
data = np.load('market_data.npz')['data']
# Sort by timestamps
sorted_exch_index = np.argsort(data['exch_ts'])
sorted_local_index = np.argsort(data['local_ts'])
corrected_data = correct_event_order(
data,
sorted_exch_index,
sorted_local_index
)
data
NDArray[event_dtype]
required
Market data array to correct.
Indices that sort data by exchange timestamp.sorted_exch_index = np.argsort(data['exch_ts'])
Indices that sort data by local timestamp.sorted_local_index = np.argsort(data['local_ts'])
Corrected data with events properly ordered. May be up to 2x the original size due to event splitting.
How it works:
When exchange timestamps are out of order (due to network issues or data collection), this function:
- Splits each event into separate exchange-side and local-side events
- Maintains both temporal orderings (exchange time and local time)
- Sets appropriate
EXCH_EVENT and LOCAL_EVENT flags
See the data documentation for more details.
Example:
import numpy as np
from hftbacktest.data import correct_event_order
data = np.load('market_data.npz')['data']
# Create sorted indices
sorted_exch_idx = np.argsort(data['exch_ts'])
sorted_local_idx = np.argsort(data['local_ts'])
# Correct event order
corrected = correct_event_order(data, sorted_exch_idx, sorted_local_idx)
print(f"Original events: {len(data)}")
print(f"Corrected events: {len(corrected)}")
# Save corrected data
np.savez_compressed('corrected_data.npz', data=corrected)
validate_event_order
Validates that events are correctly ordered without modification.
from hftbacktest.data import validate_event_order
data = np.load('market_data.npz')['data']
try:
validate_event_order(data)
print("Data is valid")
except ValueError as e:
print(f"Data validation failed: {e}")
data
NDArray[event_dtype]
required
Market data array to validate.
Raises:
ValueError: If exchange events or local events are out of order
Example:
import numpy as np
from hftbacktest.data import validate_event_order
data = np.load('market_data.npz')['data']
try:
validate_event_order(data)
print("✓ Data validation passed")
print(" - Exchange events are in order")
print(" - Local events are in order")
except ValueError as e:
print(f"✗ Data validation failed: {e}")
print(" Consider using correct_event_order() to fix the data")
FuseMarketDepth
Combines real-time Level-1 ticker stream with conflated Level-2 depth stream to produce frequent, granular depth events.
from hftbacktest.data import FuseMarketDepth
from numba import njit
@njit
def fuse_data(depth_events, ticker_events, tick_size, lot_size):
fuse = FuseMarketDepth(tick_size, lot_size)
# Process events
for i in range(len(depth_events)):
fuse.process_event(depth_events, i, add=False)
for i in range(len(ticker_events)):
fuse.process_event(ticker_events, i, add=True)
# Get fused result
fused = fuse.fused_events()
fuse.close()
return fused
Constructor
Tick size (minimum price increment) for the asset.
Lot size (minimum quantity increment) for the asset.
Example:
from hftbacktest.data import FuseMarketDepth
from numba import njit
@njit
def create_fuser():
tick_size = 0.01 # $0.01 tick
lot_size = 0.001 # 0.001 BTC lot
fuse = FuseMarketDepth(tick_size, lot_size)
return fuse
Methods
Processes a market event.Parameters:
ev (NDArray[event_dtype]): Event array
index (uint64): Index of the event to process
add (bool): If True, add event to fused output; if False, use for depth updates only
@njit
def process_data(depth_data, ticker_data):
fuse = FuseMarketDepth(0.01, 0.001)
# Process depth updates (update state but don't add to output)
for i in range(len(depth_data)):
fuse.process_event(depth_data, i, add=False)
# Process ticker updates (add to output with updated depth)
for i in range(len(ticker_data)):
fuse.process_event(ticker_data, i, add=True)
result = fuse.fused_events()
fuse.close()
return result
Returns the array of fused events.Returns: NDArray[event_dtype] - Fused market depth eventsfused = fuse.fused_events()
print(f"Generated {len(fused)} fused events")
Releases resources. Must be called when done using the fuser.fuse.close() # Free memory
Complete Example:
import numpy as np
from numba import njit
from hftbacktest.data import FuseMarketDepth
from hftbacktest.binding import event_dtype
@njit
def fuse_market_data(depth_file, ticker_file, tick_size, lot_size):
"""
Fuse conflated Level-2 depth with real-time Level-1 ticker.
Args:
depth_file: Infrequent but complete depth snapshots
ticker_file: Frequent BBO updates
tick_size: Minimum price increment
lot_size: Minimum quantity increment
Returns:
Fused events with frequent and accurate depth
"""
# Load data
depth_data = np.load(depth_file)['data']
ticker_data = np.load(ticker_file)['data']
# Create fuser
fuse = FuseMarketDepth(tick_size, lot_size)
# Process depth snapshots (state updates only)
for i in range(len(depth_data)):
fuse.process_event(depth_data, i, add=False)
# Process ticker updates (generate output events)
for i in range(len(ticker_data)):
fuse.process_event(ticker_data, i, add=True)
# Get fused result
fused_events = fuse.fused_events()
# Clean up
fuse.close()
return fused_events
# Usage
fused = fuse_market_data(
'depth_snapshots.npz',
'ticker_updates.npz',
tick_size=0.01,
lot_size=0.001
)
print(f"Generated {len(fused)} fused events")
np.savez_compressed('fused_data.npz', data=fused)
Data Preparation Workflow
Typical workflow for preparing market data:
1. Load Raw Data
import numpy as np
from hftbacktest.binding import event_dtype
# Load from exchange-specific format
raw_data = load_exchange_data('binance_btcusdt_20240101.csv')
# Convert to event_dtype format
events = convert_to_event_dtype(raw_data)
2. Correct Timestamps
from hftbacktest.data import correct_local_timestamp
# Fix negative latencies
events = correct_local_timestamp(events, base_latency=100_000)
3. Correct Event Order (if needed)
from hftbacktest.data import correct_event_order
# Check if correction is needed
try:
validate_event_order(events)
except ValueError:
# Correct the order
sorted_exch = np.argsort(events['exch_ts'])
sorted_local = np.argsort(events['local_ts'])
events = correct_event_order(events, sorted_exch, sorted_local)
4. Validate
from hftbacktest.data import validate_event_order
validate_event_order(events)
print("Data is ready for backtesting")
5. Save
np.savez_compressed('prepared_data.npz', data=events)
Data Sources
HftBacktest includes utilities for converting data from various sources:
- Binance Futures (
hftbacktest.data.utils.binancefutures)
- Binance Historical Market Data (
hftbacktest.data.utils.binancehistmktdata)
- Bybit (
hftbacktest.data.utils.bybit)
- Bybit Historical Market Data (
hftbacktest.data.utils.bybithistmktdata)
- Tardis (
hftbacktest.data.utils.tardis)
- DataBento (
hftbacktest.data.utils.databento)
- Hyperliquid (
hftbacktest.data.utils.hyperliquid)
- MEXC (
hftbacktest.data.utils.mexc)
See the Data Preparation tutorial for detailed guides on converting data from these sources.
Best Practices
Data Quality
- Always validate data before backtesting
- Correct timestamps to ensure realistic latency
- Check for gaps in the data that could affect results
- Verify tick and lot sizes match the exchange specifications
- Use compressed
.npz files to reduce storage
- Split large datasets into smaller chunks for faster loading
- Keep data sorted by timestamp to avoid runtime sorting
Latency Modeling
- Use real latency data when available via
intp_order_latency()
- Add base latency to account for processing time
- Test with different latency offsets for cross-exchange scenarios
Example: Complete Data Pipeline
import numpy as np
from hftbacktest.data import (
correct_local_timestamp,
correct_event_order,
validate_event_order
)
from hftbacktest.binding import event_dtype
def prepare_market_data(input_file, output_file, base_latency=100_000):
"""
Complete data preparation pipeline.
Args:
input_file: Raw data file
output_file: Output file for prepared data
base_latency: Base latency in nanoseconds
"""
print(f"Loading data from {input_file}...")
data = np.load(input_file)['data']
print(f" Loaded {len(data)} events")
# Step 1: Correct timestamps
print("Correcting timestamps...")
data = correct_local_timestamp(data, base_latency=base_latency)
# Step 2: Check if event order correction is needed
print("Checking event order...")
try:
validate_event_order(data)
print(" Event order is valid")
except ValueError as e:
print(f" Event order invalid: {e}")
print(" Correcting event order...")
sorted_exch = np.argsort(data['exch_ts'])
sorted_local = np.argsort(data['local_ts'])
data = correct_event_order(data, sorted_exch, sorted_local)
print(f" Generated {len(data)} events after correction")
# Step 3: Final validation
print("Final validation...")
validate_event_order(data)
# Step 4: Save
print(f"Saving to {output_file}...")
np.savez_compressed(output_file, data=data)
print("Done!")
return data
# Usage
data = prepare_market_data(
input_file='raw_btcusdt_20240101.npz',
output_file='btcusdt_20240101.npz',
base_latency=100_000 # 100 microseconds
)