Pandas is the most popular Python library for data analysis and manipulation. QuestDB integrates seamlessly with Pandas through multiple methods, allowing you to leverage QuestDB’s high-performance time-series capabilities within your Python workflows.
The PostgreSQL Wire Protocol on port 8812 is ideal for querying data:
import pandas as pdimport psycopg2from sqlalchemy import create_engine# Method 1: Using SQLAlchemy (recommended)engine = create_engine('postgresql://admin:quest@localhost:8812/qdb')# Read data into DataFramedf = pd.read_sql_query( "SELECT * FROM trades WHERE timestamp > '2024-01-01'", engine)print(df.head())print(df.info())
# SAMPLE BY aggregationquery = """SELECT timestamp, symbol, first(price) as open, last(price) as close, min(price) as low, max(price) as high, sum(volume) as volumeFROM tradesWHERE timestamp > dateadd('d', -7, now())SAMPLE BY 1h ALIGN TO CALENDAR"""df_ohlc = pd.read_sql_query(query, engine)
For high-performance ingestion, use ILP on port 9009:
import pandas as pdfrom questdb.ingress import Sender, IngressError# Sample DataFramedf = pd.DataFrame({ 'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'), 'symbol': ['BTC-USD'] * 500 + ['ETH-USD'] * 500, 'price': [45000 + i * 10 for i in range(1000)], 'volume': [100 + i for i in range(1000)]})# Send to QuestDB using ILPtry: with Sender('localhost', 9009) as sender: for _, row in df.iterrows(): sender.row( 'trades', symbols={'symbol': row['symbol']}, columns={ 'price': row['price'], 'volume': row['volume'] }, at=row['timestamp'] ) sender.flush() print(f"Sent {len(df)} rows to QuestDB")except IngressError as e: print(f"Ingestion error: {e}")
import pandas as pdfrom concurrent.futures import ThreadPoolExecutorfrom questdb.ingress import Senderdef send_chunk(chunk): with Sender('localhost', 9009) as sender: for _, row in chunk.iterrows(): sender.row( 'metrics', symbols={'sensor': row['sensor']}, columns={'value': row['value']}, at=row['timestamp'] ) sender.flush()# Split DataFrame into chunkschunks = [df[i:i+10000] for i in range(0, len(df), 10000)]# Parallel ingestionwith ThreadPoolExecutor(max_workers=4) as executor: executor.map(send_chunk, chunks)
# Read large table in chunkschunksize = 100000for chunk in pd.read_sql_query( "SELECT * FROM large_table", engine, chunksize=chunksize): # Process each chunk result = chunk.groupby('category')['value'].mean() print(result)
import psycopg2conn = psycopg2.connect( host='localhost', port=8812, user='admin', password='quest', database='qdb')# Use server-side cursor for large resultscursor = conn.cursor(name='fetch_large_result')cursor.execute("SELECT * FROM trades")# Fetch in batcheswhile True: rows = cursor.fetchmany(size=10000) if not rows: break df_chunk = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description]) # Process chunk print(f"Processed {len(df_chunk)} rows")cursor.close()conn.close()
# Use chunked processing for large DataFramesfor chunk in pd.read_sql_query(query, engine, chunksize=10000): # Process chunk process_data(chunk) # Free memory del chunk