Skip to main content
The data module provides utilities for loading market data, validating event order, and correcting timestamps.

Data Format

HftBacktest uses a structured NumPy array format for market data. Events must be stored in .npz files or NumPy arrays with the event_dtype structure.

Event Structure

Each event contains:
  • Event type and flags (ev)
  • Exchange timestamp (exch_ts)
  • Local receipt timestamp (local_ts)
  • Price (px)
  • Quantity (qty)
  • Order ID for Level-3 events (order_id)
  • Additional integer and float fields (ival, fval)
See Types for the complete event_dtype definition.

Validation Functions

correct_local_timestamp

Adjusts local timestamps to ensure non-negative feed latency.
from hftbacktest.data import correct_local_timestamp
import numpy as np

data = np.load('market_data.npz')['data']
corrected_data = correct_local_timestamp(data, base_latency=0)
data
NDArray[event_dtype]
required
Market data array to correct (modified in place).
base_latency
float
Base latency to add after correction, in the same time unit as the data (typically nanoseconds). Default: 0Due to clock synchronization issues between exchange and local systems, negative latencies may occur. This function:
  1. Finds the minimum (most negative) feed latency
  2. Offsets all local timestamps by this amount plus base_latency
This ensures all latencies are positive and realistic.
return
NDArray[event_dtype]
The corrected data array (same as input, modified in place).
Example:
import numpy as np
from hftbacktest.data import correct_local_timestamp
from hftbacktest.binding import event_dtype

# Load data
data = np.load('btcusdt_data.npz')['data']

# Add 100 microseconds base latency
base_latency = 100_000  # 100 microseconds in nanoseconds
corrected = correct_local_timestamp(data, base_latency=base_latency)

# Verify latencies are positive
latencies = corrected['local_ts'] - corrected['exch_ts']
assert np.all(latencies >= base_latency)

correct_event_order

Corrects reversed exchange timestamps by splitting events into separate exchange and local events.
from hftbacktest.data import correct_event_order
import numpy as np

data = np.load('market_data.npz')['data']

# Sort by timestamps
sorted_exch_index = np.argsort(data['exch_ts'])
sorted_local_index = np.argsort(data['local_ts'])

corrected_data = correct_event_order(
    data,
    sorted_exch_index,
    sorted_local_index
)
data
NDArray[event_dtype]
required
Market data array to correct.
sorted_exch_index
NDArray[int]
required
Indices that sort data by exchange timestamp.
sorted_exch_index = np.argsort(data['exch_ts'])
sorted_local_index
NDArray[int]
required
Indices that sort data by local timestamp.
sorted_local_index = np.argsort(data['local_ts'])
return
NDArray[event_dtype]
Corrected data with events properly ordered. May be up to 2x the original size due to event splitting.
How it works: When exchange timestamps are out of order (due to network issues or data collection), this function:
  1. Splits each event into separate exchange-side and local-side events
  2. Maintains both temporal orderings (exchange time and local time)
  3. Sets appropriate EXCH_EVENT and LOCAL_EVENT flags
See the data documentation for more details. Example:
import numpy as np
from hftbacktest.data import correct_event_order

data = np.load('market_data.npz')['data']

# Create sorted indices
sorted_exch_idx = np.argsort(data['exch_ts'])
sorted_local_idx = np.argsort(data['local_ts'])

# Correct event order
corrected = correct_event_order(data, sorted_exch_idx, sorted_local_idx)

print(f"Original events: {len(data)}")
print(f"Corrected events: {len(corrected)}")

# Save corrected data
np.savez_compressed('corrected_data.npz', data=corrected)

validate_event_order

Validates that events are correctly ordered without modification.
from hftbacktest.data import validate_event_order

data = np.load('market_data.npz')['data']

try:
    validate_event_order(data)
    print("Data is valid")
except ValueError as e:
    print(f"Data validation failed: {e}")
data
NDArray[event_dtype]
required
Market data array to validate.
Raises:
  • ValueError: If exchange events or local events are out of order
Example:
import numpy as np
from hftbacktest.data import validate_event_order

data = np.load('market_data.npz')['data']

try:
    validate_event_order(data)
    print("✓ Data validation passed")
    print("  - Exchange events are in order")
    print("  - Local events are in order")
except ValueError as e:
    print(f"✗ Data validation failed: {e}")
    print("  Consider using correct_event_order() to fix the data")

FuseMarketDepth

Combines real-time Level-1 ticker stream with conflated Level-2 depth stream to produce frequent, granular depth events.
from hftbacktest.data import FuseMarketDepth
from numba import njit

@njit
def fuse_data(depth_events, ticker_events, tick_size, lot_size):
    fuse = FuseMarketDepth(tick_size, lot_size)
    
    # Process events
    for i in range(len(depth_events)):
        fuse.process_event(depth_events, i, add=False)
    
    for i in range(len(ticker_events)):
        fuse.process_event(ticker_events, i, add=True)
    
    # Get fused result
    fused = fuse.fused_events()
    
    fuse.close()
    return fused

Constructor

tick_size
float64
required
Tick size (minimum price increment) for the asset.
lot_size
float64
required
Lot size (minimum quantity increment) for the asset.
Example:
from hftbacktest.data import FuseMarketDepth
from numba import njit

@njit
def create_fuser():
    tick_size = 0.01  # $0.01 tick
    lot_size = 0.001  # 0.001 BTC lot
    
    fuse = FuseMarketDepth(tick_size, lot_size)
    return fuse

Methods

process_event
method
Processes a market event.Parameters:
  • ev (NDArray[event_dtype]): Event array
  • index (uint64): Index of the event to process
  • add (bool): If True, add event to fused output; if False, use for depth updates only
@njit
def process_data(depth_data, ticker_data):
    fuse = FuseMarketDepth(0.01, 0.001)
    
    # Process depth updates (update state but don't add to output)
    for i in range(len(depth_data)):
        fuse.process_event(depth_data, i, add=False)
    
    # Process ticker updates (add to output with updated depth)
    for i in range(len(ticker_data)):
        fuse.process_event(ticker_data, i, add=True)
    
    result = fuse.fused_events()
    fuse.close()
    return result
fused_events
method
Returns the array of fused events.Returns: NDArray[event_dtype] - Fused market depth events
fused = fuse.fused_events()
print(f"Generated {len(fused)} fused events")
close
method
Releases resources. Must be called when done using the fuser.
fuse.close()  # Free memory
Complete Example:
import numpy as np
from numba import njit
from hftbacktest.data import FuseMarketDepth
from hftbacktest.binding import event_dtype

@njit
def fuse_market_data(depth_file, ticker_file, tick_size, lot_size):
    """
    Fuse conflated Level-2 depth with real-time Level-1 ticker.
    
    Args:
        depth_file: Infrequent but complete depth snapshots
        ticker_file: Frequent BBO updates
        tick_size: Minimum price increment
        lot_size: Minimum quantity increment
    
    Returns:
        Fused events with frequent and accurate depth
    """
    # Load data
    depth_data = np.load(depth_file)['data']
    ticker_data = np.load(ticker_file)['data']
    
    # Create fuser
    fuse = FuseMarketDepth(tick_size, lot_size)
    
    # Process depth snapshots (state updates only)
    for i in range(len(depth_data)):
        fuse.process_event(depth_data, i, add=False)
    
    # Process ticker updates (generate output events)
    for i in range(len(ticker_data)):
        fuse.process_event(ticker_data, i, add=True)
    
    # Get fused result
    fused_events = fuse.fused_events()
    
    # Clean up
    fuse.close()
    
    return fused_events

# Usage
fused = fuse_market_data(
    'depth_snapshots.npz',
    'ticker_updates.npz',
    tick_size=0.01,
    lot_size=0.001
)

print(f"Generated {len(fused)} fused events")
np.savez_compressed('fused_data.npz', data=fused)

Data Preparation Workflow

Typical workflow for preparing market data:

1. Load Raw Data

import numpy as np
from hftbacktest.binding import event_dtype

# Load from exchange-specific format
raw_data = load_exchange_data('binance_btcusdt_20240101.csv')

# Convert to event_dtype format
events = convert_to_event_dtype(raw_data)

2. Correct Timestamps

from hftbacktest.data import correct_local_timestamp

# Fix negative latencies
events = correct_local_timestamp(events, base_latency=100_000)

3. Correct Event Order (if needed)

from hftbacktest.data import correct_event_order

# Check if correction is needed
try:
    validate_event_order(events)
except ValueError:
    # Correct the order
    sorted_exch = np.argsort(events['exch_ts'])
    sorted_local = np.argsort(events['local_ts'])
    events = correct_event_order(events, sorted_exch, sorted_local)

4. Validate

from hftbacktest.data import validate_event_order

validate_event_order(events)
print("Data is ready for backtesting")

5. Save

np.savez_compressed('prepared_data.npz', data=events)

Data Sources

HftBacktest includes utilities for converting data from various sources:
  • Binance Futures (hftbacktest.data.utils.binancefutures)
  • Binance Historical Market Data (hftbacktest.data.utils.binancehistmktdata)
  • Bybit (hftbacktest.data.utils.bybit)
  • Bybit Historical Market Data (hftbacktest.data.utils.bybithistmktdata)
  • Tardis (hftbacktest.data.utils.tardis)
  • DataBento (hftbacktest.data.utils.databento)
  • Hyperliquid (hftbacktest.data.utils.hyperliquid)
  • MEXC (hftbacktest.data.utils.mexc)
See the Data Preparation tutorial for detailed guides on converting data from these sources.

Best Practices

Data Quality

  1. Always validate data before backtesting
  2. Correct timestamps to ensure realistic latency
  3. Check for gaps in the data that could affect results
  4. Verify tick and lot sizes match the exchange specifications

Performance

  1. Use compressed .npz files to reduce storage
  2. Split large datasets into smaller chunks for faster loading
  3. Keep data sorted by timestamp to avoid runtime sorting

Latency Modeling

  1. Use real latency data when available via intp_order_latency()
  2. Add base latency to account for processing time
  3. Test with different latency offsets for cross-exchange scenarios

Example: Complete Data Pipeline

import numpy as np
from hftbacktest.data import (
    correct_local_timestamp,
    correct_event_order,
    validate_event_order
)
from hftbacktest.binding import event_dtype

def prepare_market_data(input_file, output_file, base_latency=100_000):
    """
    Complete data preparation pipeline.
    
    Args:
        input_file: Raw data file
        output_file: Output file for prepared data
        base_latency: Base latency in nanoseconds
    """
    print(f"Loading data from {input_file}...")
    data = np.load(input_file)['data']
    print(f"  Loaded {len(data)} events")
    
    # Step 1: Correct timestamps
    print("Correcting timestamps...")
    data = correct_local_timestamp(data, base_latency=base_latency)
    
    # Step 2: Check if event order correction is needed
    print("Checking event order...")
    try:
        validate_event_order(data)
        print("  Event order is valid")
    except ValueError as e:
        print(f"  Event order invalid: {e}")
        print("  Correcting event order...")
        sorted_exch = np.argsort(data['exch_ts'])
        sorted_local = np.argsort(data['local_ts'])
        data = correct_event_order(data, sorted_exch, sorted_local)
        print(f"  Generated {len(data)} events after correction")
    
    # Step 3: Final validation
    print("Final validation...")
    validate_event_order(data)
    
    # Step 4: Save
    print(f"Saving to {output_file}...")
    np.savez_compressed(output_file, data=data)
    print("Done!")
    
    return data

# Usage
data = prepare_market_data(
    input_file='raw_btcusdt_20240101.npz',
    output_file='btcusdt_20240101.npz',
    base_latency=100_000  # 100 microseconds
)

Build docs developers (and LLMs) love