Overview
The CryptoDataCollector class provides a robust interface for fetching cryptocurrency market data from various exchanges using the CCXT library. It supports real-time price data, historical OHLCV data, and 24-hour trading statistics.
Class: CryptoDataCollector
Location: source/data/collectors.py:12
Constructor
CryptoDataCollector(exchange_name: str = 'kraken')
Initializes a new data collector instance connected to the specified exchange.
Parameters
| Parameter | Type | Default | Description |
|---|
exchange_name | str | 'kraken' | Name of the exchange to connect to (e.g., ‘kraken’, ‘binance’, ‘coinbase’) |
Example
from data.collectors import CryptoDataCollector
# Connect to Kraken (default)
collector = CryptoDataCollector()
# Connect to Binance
binance_collector = CryptoDataCollector('binance')
Methods
fetch_ohlcv
Location: source/data/collectors.py:35
fetch_ohlcv(symbol: str, timeframe: str = '1h', limit: int = 1000) -> pd.DataFrame
Fetches historical OHLCV (Open, High, Low, Close, Volume) data for a trading pair.
This method is cached using Streamlit’s @st.cache_data decorator with a TTL defined in settings.
Parameters
| Parameter | Type | Default | Description |
|---|
symbol | str | Required | Trading pair symbol (e.g., ‘BTC/USDT’, ‘ETH/USD’) |
timeframe | str | '1h' | Candlestick timeframe (‘1m’, ‘5m’, ‘15m’, ‘1h’, ‘4h’, ‘1d’, etc.) |
limit | int | 1000 | Number of candles to fetch (max depends on exchange) |
Returns
Type: pd.DataFrame
Returns a pandas DataFrame with the following columns:
| Column | Type | Description |
|---|
timestamp (index) | datetime64 | Timestamp for each candle |
open | float | Opening price |
high | float | Highest price in period |
low | float | Lowest price in period |
close | float | Closing price |
volume | float | Trading volume |
returns | float | Percentage change (calculated) |
log_returns | float | Logarithmic returns (calculated) |
Example
collector = CryptoDataCollector('kraken')
# Fetch 1-hour Bitcoin data
btc_data = collector.fetch_ohlcv('BTC/USDT', timeframe='1h', limit=500)
if not btc_data.empty:
print(f"Latest close price: ${btc_data['close'].iloc[-1]:.2f}")
print(f"24h return: {btc_data['returns'].iloc[-24:].sum() * 100:.2f}%")
# Fetch 5-minute Ethereum data
eth_data = collector.fetch_ohlcv('ETH/USDT', timeframe='5m', limit=100)
get_current_price
Location: source/data/collectors.py:62
get_current_price(symbol: str) -> Optional[float]
Fetches the current market price for a trading pair.
Parameters
| Parameter | Type | Description |
|---|
symbol | str | Trading pair symbol (e.g., ‘BTC/USDT’) |
Returns
Type: Optional[float]
- Returns the last traded price as a
float
- Returns
None if the exchange connection failed or an error occurred
Example
collector = CryptoDataCollector('binance')
btc_price = collector.get_current_price('BTC/USDT')
if btc_price:
print(f"Current Bitcoin price: ${btc_price:,.2f}")
else:
print("Failed to fetch price")
# Check multiple coins
coins = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT']
for coin in coins:
price = collector.get_current_price(coin)
if price:
print(f"{coin}: ${price:,.2f}")
get_24h_stats
Location: source/data/collectors.py:74
get_24h_stats(symbol: str) -> dict
Fetches comprehensive 24-hour trading statistics for a symbol.
Parameters
| Parameter | Type | Description |
|---|
symbol | str | Trading pair symbol (e.g., ‘BTC/USDT’) |
Returns
Type: dict
Returns a dictionary with the following keys:
| Key | Type | Description |
|---|
last | float | Last traded price |
high | float | 24-hour high price |
low | float | 24-hour low price |
volume | float | 24-hour quote volume |
change | float | Absolute price change |
percentage | float | Percentage price change |
Returns an empty dictionary {} if an error occurs.
Example
collector = CryptoDataCollector('kraken')
stats = collector.get_24h_stats('BTC/USDT')
if stats:
print(f"Price: ${stats['last']:,.2f}")
print(f"24h High: ${stats['high']:,.2f}")
print(f"24h Low: ${stats['low']:,.2f}")
print(f"24h Volume: ${stats['volume']:,.2f}")
print(f"24h Change: {stats['percentage']:.2f}%")
# Check if price is near daily high
if stats['last'] >= stats['high'] * 0.95:
print("Price is within 5% of daily high!")
Utility Functions
test_connection
Location: source/data/collectors.py:93
test_connection(exchange_name: str = 'kraken') -> bool
Tests the connection to an exchange by attempting to fetch Bitcoin price.
Parameters
| Parameter | Type | Default | Description |
|---|
exchange_name | str | 'kraken' | Name of the exchange to test |
Returns
Type: bool
True if connection successful and price was fetched
False if connection failed
Example
from data.collectors import test_connection
# Test different exchanges
exchanges = ['kraken', 'binance', 'coinbase']
for exchange in exchanges:
if test_connection(exchange):
print(f"✓ {exchange} connection successful")
else:
print(f"✗ {exchange} connection failed")
Complete Usage Example
Here’s a comprehensive example demonstrating all the functionality:
from data.collectors import CryptoDataCollector, test_connection
import pandas as pd
# Test connection first
if not test_connection('binance'):
print("Cannot connect to exchange")
exit(1)
# Initialize collector
collector = CryptoDataCollector('binance')
# Get current price
current_price = collector.get_current_price('BTC/USDT')
print(f"Current BTC Price: ${current_price:,.2f}")
# Get 24-hour statistics
stats = collector.get_24h_stats('BTC/USDT')
print(f"\n24-Hour Statistics:")
print(f"High: ${stats['high']:,.2f}")
print(f"Low: ${stats['low']:,.2f}")
print(f"Volume: ${stats['volume']:,.0f}")
print(f"Change: {stats['percentage']:.2f}%")
# Fetch historical data
df = collector.fetch_ohlcv('BTC/USDT', timeframe='1h', limit=168) # 1 week
if not df.empty:
print(f"\nHistorical Data Summary:")
print(f"Data points: {len(df)}")
print(f"Date range: {df.index[0]} to {df.index[-1]}")
print(f"Price range: ${df['low'].min():,.2f} - ${df['high'].max():,.2f}")
print(f"Average volume: {df['volume'].mean():,.2f}")
print(f"Total return: {(df['close'].iloc[-1] / df['close'].iloc[0] - 1) * 100:.2f}%")
# Calculate volatility
volatility = df['returns'].std() * (24 ** 0.5) # Daily volatility
print(f"Daily volatility: {volatility * 100:.2f}%")
Error Handling
All methods handle errors gracefully:
- Connection errors: Display error message via Streamlit and return
None or empty DataFrame
- Invalid symbols: Return empty results with error notification
- Rate limits: Automatically handled by CCXT’s
enableRateLimit setting
- Timeout errors: 30-second timeout configured for all requests
Make sure to configure BINANCE_API_KEY and BINANCE_API_SECRET in your config/settings.py file if using authenticated endpoints.
Dependencies
ccxt - Cryptocurrency exchange library
pandas - Data manipulation
numpy - Numerical computations
streamlit - Caching functionality
See Also