Skip to main content

Overview

Pandas is the most popular Python library for data analysis and manipulation. QuestDB integrates seamlessly with Pandas through multiple methods, allowing you to leverage QuestDB’s high-performance time-series capabilities within your Python workflows.

Connection Methods

  • PostgreSQL Wire Protocol (port 8812): Query data into DataFrames
  • InfluxDB Line Protocol (port 9009): High-speed data ingestion
  • REST API (port 9000): Simple HTTP queries and imports

Prerequisites

pip install pandas psycopg2-binary questdb
QuestDB uses port 8812 for PostgreSQL Wire Protocol queries and port 9009 for high-throughput InfluxDB Line Protocol ingestion.

Reading Data from QuestDB

Using PostgreSQL Wire Protocol

The PostgreSQL Wire Protocol on port 8812 is ideal for querying data:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# Method 1: Using SQLAlchemy (recommended)
engine = create_engine('postgresql://admin:quest@localhost:8812/qdb')

# Read data into DataFrame
df = pd.read_sql_query(
    "SELECT * FROM trades WHERE timestamp > '2024-01-01'",
    engine
)

print(df.head())
print(df.info())

Using psycopg2 Directly

import pandas as pd
import psycopg2

# Connect to QuestDB
conn = psycopg2.connect(
    host='localhost',
    port=8812,
    user='admin',
    password='quest',
    database='qdb'
)

# Query with parameters
query = """
SELECT 
    timestamp,
    symbol,
    price,
    volume
FROM trades
WHERE symbol = %s
  AND timestamp > %s
ORDER BY timestamp DESC
"""

df = pd.read_sql_query(
    query,
    conn,
    params=('BTC-USD', '2024-01-01')
)

conn.close()

Time-Series Queries

Leverage QuestDB’s time-series SQL extensions:
# SAMPLE BY aggregation
query = """
SELECT 
    timestamp,
    symbol,
    first(price) as open,
    last(price) as close,
    min(price) as low,
    max(price) as high,
    sum(volume) as volume
FROM trades
WHERE timestamp > dateadd('d', -7, now())
SAMPLE BY 1h ALIGN TO CALENDAR
"""

df_ohlc = pd.read_sql_query(query, engine)

Using LATEST ON

# Get latest values per symbol
query = """
SELECT 
    timestamp,
    symbol,
    price,
    volume
FROM trades
LATEST ON timestamp PARTITION BY symbol
"""

df_latest = pd.read_sql_query(query, engine)

Writing Data to QuestDB

Method 1: InfluxDB Line Protocol (Fastest)

For high-performance ingestion, use ILP on port 9009:
import pandas as pd
from questdb.ingress import Sender, IngressError

# Sample DataFrame
df = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1s'),
    'symbol': ['BTC-USD'] * 500 + ['ETH-USD'] * 500,
    'price': [45000 + i * 10 for i in range(1000)],
    'volume': [100 + i for i in range(1000)]
})

# Send to QuestDB using ILP
try:
    with Sender('localhost', 9009) as sender:
        for _, row in df.iterrows():
            sender.row(
                'trades',
                symbols={'symbol': row['symbol']},
                columns={
                    'price': row['price'],
                    'volume': row['volume']
                },
                at=row['timestamp']
            )
        sender.flush()
    print(f"Sent {len(df)} rows to QuestDB")
except IngressError as e:
    print(f"Ingestion error: {e}")

Method 2: PostgreSQL Wire Protocol

Use for smaller datasets or when you need ACID guarantees:
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://admin:quest@localhost:8812/qdb')

# Create sample DataFrame
df = pd.DataFrame({
    'sensor_id': ['sensor_001', 'sensor_002'] * 500,
    'temperature': [20 + i * 0.1 for i in range(1000)],
    'humidity': [50 + i * 0.05 for i in range(1000)],
    'timestamp': pd.date_range('2024-01-01', periods=1000, freq='1min')
})

# Write to QuestDB
df.to_sql(
    'sensor_readings',
    engine,
    if_exists='append',
    index=False,
    method='multi',
    chunksize=1000
)

Batch Insert with psycopg2

import psycopg2
import psycopg2.extras

conn = psycopg2.connect(
    host='localhost',
    port=8812,
    user='admin',
    password='quest',
    database='qdb'
)

# Create table first
with conn.cursor() as cur:
    cur.execute("""
        CREATE TABLE IF NOT EXISTS sensor_data (
            sensor_id SYMBOL,
            temperature DOUBLE,
            humidity DOUBLE,
            timestamp TIMESTAMP
        ) TIMESTAMP(timestamp) PARTITION BY DAY;
    """)
    conn.commit()

# Batch insert
data = list(df.itertuples(index=False, name=None))
with conn.cursor() as cur:
    psycopg2.extras.execute_batch(
        cur,
        "INSERT INTO sensor_data VALUES (%s, %s, %s, %s)",
        data,
        page_size=1000
    )
    conn.commit()

conn.close()

Data Analysis Examples

Time-Series Resampling

import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://admin:quest@localhost:8812/qdb')

# Read high-frequency data
df = pd.read_sql_query(
    "SELECT * FROM trades WHERE timestamp > dateadd('d', -1, now())",
    engine,
    parse_dates=['timestamp']
)

# Set timestamp as index
df.set_index('timestamp', inplace=True)

# Resample to 5-minute OHLC
ohlc = df['price'].resample('5min').ohlc()
volume = df['volume'].resample('5min').sum()

result = pd.concat([ohlc, volume], axis=1)
print(result.head())

Rolling Statistics

# Calculate 20-period moving average
df['ma_20'] = df['price'].rolling(window=20).mean()

# Calculate rolling standard deviation
df['volatility'] = df['price'].rolling(window=20).std()

# Write results back to QuestDB
df[['timestamp', 'symbol', 'price', 'ma_20', 'volatility']].to_sql(
    'trades_with_indicators',
    engine,
    if_exists='replace',
    index=False
)

Pivot Tables

# Read data
df = pd.read_sql_query(
    "SELECT * FROM metrics WHERE timestamp > dateadd('h', -24, now())",
    engine
)

# Create pivot table
pivot = df.pivot_table(
    values='value',
    index='timestamp',
    columns='metric_name',
    aggfunc='mean'
)

print(pivot.head())

Performance Optimization

Best Practices
  • Use ILP (port 9009) for high-volume ingestion (millions of rows/sec)
  • Use PostgreSQL Wire (port 8812) for queries and analytical workloads
  • Batch inserts in chunks of 1000-10000 rows
  • Use SYMBOL type for high-cardinality string columns
  • Partition tables by DAY or MONTH for time-series data

Parallel Ingestion

import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from questdb.ingress import Sender

def send_chunk(chunk):
    with Sender('localhost', 9009) as sender:
        for _, row in chunk.iterrows():
            sender.row(
                'metrics',
                symbols={'sensor': row['sensor']},
                columns={'value': row['value']},
                at=row['timestamp']
            )
        sender.flush()

# Split DataFrame into chunks
chunks = [df[i:i+10000] for i in range(0, len(df), 10000)]

# Parallel ingestion
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(send_chunk, chunks)

Working with Large Datasets

Chunked Reading

# Read large table in chunks
chunksize = 100000
for chunk in pd.read_sql_query(
    "SELECT * FROM large_table",
    engine,
    chunksize=chunksize
):
    # Process each chunk
    result = chunk.groupby('category')['value'].mean()
    print(result)

Streaming Results

import psycopg2

conn = psycopg2.connect(
    host='localhost',
    port=8812,
    user='admin',
    password='quest',
    database='qdb'
)

# Use server-side cursor for large results
cursor = conn.cursor(name='fetch_large_result')
cursor.execute("SELECT * FROM trades")

# Fetch in batches
while True:
    rows = cursor.fetchmany(size=10000)
    if not rows:
        break
    df_chunk = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
    # Process chunk
    print(f"Processed {len(df_chunk)} rows")

cursor.close()
conn.close()

Data Export

Export to CSV

df = pd.read_sql_query("SELECT * FROM trades", engine)
df.to_csv('trades_export.csv', index=False)

Export to Parquet

df = pd.read_sql_query("SELECT * FROM trades", engine)
df.to_parquet('trades_export.parquet', engine='pyarrow', compression='snappy')

Troubleshooting

Connection Issues

# Test PostgreSQL connection
try:
    conn = psycopg2.connect(
        host='localhost',
        port=8812,
        user='admin',
        password='quest',
        database='qdb',
        connect_timeout=5
    )
    print("Connection successful")
    conn.close()
except Exception as e:
    print(f"Connection failed: {e}")

Memory Management

# Use chunked processing for large DataFrames
for chunk in pd.read_sql_query(query, engine, chunksize=10000):
    # Process chunk
    process_data(chunk)
    # Free memory
    del chunk

Next Steps

Python Client

Learn about the QuestDB Python client

InfluxDB Line Protocol

High-performance data ingestion

Build docs developers (and LLMs) love