Skip to main content
This example demonstrates how to use custom callbacks to monitor real-time account updates from the Drift protocol. It shows how to process market data updates as they stream in.

What It Does

The callbacks example:
  • Subscribes to perp market accounts
  • Processes updates with custom callback functions
  • Monitors specific market metrics in real-time
  • Demonstrates callback patterns for account subscriptions
  • Tracks base asset amounts and market state

Complete Source Code

use anchor_lang::AccountDeserialize;
use clap::Parser;
use dotenv;
use drift_rs::math::constants::BASE_PRECISION_I128;
use drift_rs::types::{accounts::PerpMarket, AccountUpdate, Context};
use drift_rs::{DriftClient, RpcClient, Wallet};
use env_logger;
use futures_util::future::FutureExt;
use rust_decimal::Decimal;
use std::env;
use std::time::{Duration, Instant};
use tokio::time::timeout;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
    #[clap(
        long,
        default_value = "https://api.mainnet-beta.solana.com",
        help = "RPC endpoint URL (auto-converts to WebSocket for subscriptions)"
    )]
    rpc_url: String,

    #[clap(long, action, help = "Use devnet instead of mainnet")]
    devnet: bool,

    #[clap(long, default_value = "30", help = "Duration to run in seconds")]
    duration: u64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let _ = env_logger::init();
    let _ = dotenv::dotenv().ok();
    let args = Args::parse();

    println!("🚀 Drift Market Update Monitor");
    println!("===============================\n");

    // Allow RPC_URL override from environment
    let rpc_url = env::var("RPC_URL").unwrap_or(args.rpc_url);
    let context = if args.devnet {
        Context::DevNet
    } else {
        Context::MainNet
    };

    println!("📡 Connecting to {} ({})", rpc_url, context.name());

    // Create RPC client and read-only wallet
    let rpc_client = RpcClient::new(rpc_url);
    // Create a read-only wallet (we don't need signing for subscriptions)
    let dummy_wallet = Wallet::read_only(
        solana_sdk::pubkey!("11111111111111111111111111111111")
    );

    // Initialize Drift client
    let client = DriftClient::new(context, rpc_client, dummy_wallet).await?;
    println!("✅ Connected to Drift protocol\n");

    monitor_markets(&client, args.duration).await?;

    println!("\n✅ Market monitoring complete!");

    Ok(())
}

/// Deserialize PerpMarket from account data
fn deserialize_perp_market(
    data: &[u8]
) -> Result<PerpMarket, Box<dyn std::error::Error>> {
    let market = PerpMarket::try_deserialize(&mut &data[..])?;
    Ok(market)
}

/// Monitor markets and print market_index and base_asset_amount_with_amm
async fn monitor_markets(
    client: &DriftClient,
    duration_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> {
    // Get available markets
    let perp_markets = client.get_all_perp_market_ids();

    println!("📈 Available markets:");
    println!("  • Perp markets: {} active", perp_markets.len());

    let start_time = Instant::now();

    let market_callback = move |update: &AccountUpdate| {
        // Deserialize market data
        match deserialize_perp_market(&update.data) {
            Ok(market) => {
                let elapsed = start_time.elapsed().as_secs();
                println!(
                    "[{}s] Market {}: base_asset_amount_with_amm = {}",
                    elapsed,
                    market.market_index,
                    (market.amm.base_asset_amount_with_amm.as_i128() * -1) as f64
                        / BASE_PRECISION_I128 as f64
                );
            }
            Err(e) => {
                eprintln!("Failed to deserialize market: {}", e);
            }
        }
    };

    // Subscribe to markets with callback
    client
        .subscribe_markets_with_callback(&perp_markets, market_callback)
        .await?;

    println!(
        "⏰ Running for {} seconds... (Press Ctrl+C to stop early)\n",
        duration_secs
    );

    // Run for specified duration
    match timeout(
        Duration::from_secs(duration_secs),
        tokio::signal::ctrl_c().fuse(),
    )
    .await
    {
        Ok(_) => println!("\n🛑 Interrupted by user"),
        Err(_) => println!("\n⏰ Time limit reached"),
    }

    // Unsubscribe (cleanup)
    client.unsubscribe().await?;
    println!("\n✅ Unsubscribed from all market data");

    Ok(())
}

Key Concepts

Account Update Structure

pub struct AccountUpdate {
    pub pubkey: Pubkey,     // Account address
    pub data: Vec<u8>,      // Account data
    pub slot: u64,          // Slot number
    // ...
}

Custom Callbacks

Define a closure to process updates:
let market_callback = move |update: &AccountUpdate| {
    // Deserialize the account data
    let market = PerpMarket::try_deserialize(&mut &update.data[..])?;
    
    // Process the market data
    println!("Market {} updated at slot {}", 
        market.market_index, 
        update.slot
    );
};

Subscribe with Callback

client
    .subscribe_markets_with_callback(&market_ids, market_callback)
    .await?;
The callback is invoked for every account update.

Monitoring Market Metrics

Base Asset Amount with AMM

The example monitors the AMM’s net base asset position:
let base_amount = (market.amm.base_asset_amount_with_amm.as_i128() * -1) as f64
    / BASE_PRECISION_I128 as f64;
This metric indicates:
  • Positive: AMM is net short (longs > shorts)
  • Negative: AMM is net long (shorts > longs)
  • Zero: Balanced

Other Useful Metrics

// Total liquidity
let total_liquidity = market.amm.sqrt_k;

// Mark price
let mark_price = market.amm.historical_oracle_data.last_oracle_price;

// Funding rate
let funding_rate = market.amm.last_funding_rate;

// Open interest
let open_interest = market.amm.base_asset_amount_with_unsettled_lp;

Callback Patterns

Simple Logging

let callback = |update: &AccountUpdate| {
    println!("Update received for {} at slot {}", 
        update.pubkey, 
        update.slot
    );
};

Filtering Updates

let callback = move |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        // Only log specific markets
        if market.market_index == 0 {
            println!("SOL-PERP updated: {:?}", market);
        }
    }
};

Aggregating Data

use std::sync::{Arc, Mutex};

let stats = Arc::new(Mutex::new(HashMap::new()));
let stats_clone = stats.clone();

let callback = move |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        let mut stats = stats_clone.lock().unwrap();
        stats.insert(market.market_index, market.amm.base_asset_amount_with_amm);
    }
};

Sending to Channel

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(100);

let callback = move |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        let _ = tx.try_send(market);
    }
};

// Process in separate task
tokio::spawn(async move {
    while let Some(market) = rx.recv().await {
        process_market(market).await;
    }
});

Running the Example

Basic Usage

# Monitor mainnet for 30 seconds (default)
cargo run --example drift-client-callbacks

Custom Duration

# Monitor for 60 seconds
cargo run --example drift-client-callbacks -- --duration 60

DevNet

# Monitor devnet
cargo run --example drift-client-callbacks -- --devnet

Custom RPC

# Use custom RPC endpoint
export RPC_URL="https://your-rpc-endpoint.com"
cargo run --example drift-client-callbacks

Example Output

🚀 Drift Market Update Monitor
===============================

📡 Connecting to https://api.mainnet-beta.solana.com (mainnet)
✅ Connected to Drift protocol

📈 Available markets:
  • Perp markets: 79 active
⏰ Running for 30 seconds... (Press Ctrl+C to stop early)

[1s] Market 0: base_asset_amount_with_amm = -12345.67
[2s] Market 1: base_asset_amount_with_amm = 8901.23
[2s] Market 0: base_asset_amount_with_amm = -12346.12
[3s] Market 2: base_asset_amount_with_amm = 4567.89
...

⏰ Time limit reached
✅ Unsubscribed from all market data
✅ Market monitoring complete!

Use Cases

1. Market Monitoring Dashboard

struct MarketMonitor {
    markets: HashMap<u16, PerpMarket>,
}

impl MarketMonitor {
    fn callback(&mut self) -> impl FnMut(&AccountUpdate) {
        |update: &AccountUpdate| {
            if let Ok(market) = deserialize_perp_market(&update.data) {
                self.markets.insert(market.market_index, market);
                self.update_dashboard();
            }
        }
    }
}

2. Alert System

let callback = |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        let imbalance = market.amm.base_asset_amount_with_amm.abs();
        if imbalance > THRESHOLD {
            send_alert(format!(
                "High imbalance in market {}: {}",
                market.market_index,
                imbalance
            ));
        }
    }
};

3. Data Collection

let callback = |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        let data_point = MarketDataPoint {
            timestamp: SystemTime::now(),
            market_index: market.market_index,
            base_asset_amount: market.amm.base_asset_amount_with_amm,
            mark_price: market.amm.historical_oracle_data.last_oracle_price,
            funding_rate: market.amm.last_funding_rate,
        };
        
        store_to_database(data_point);
    }
};

4. Trading Signal Generation

let callback = |update: &AccountUpdate| {
    if let Ok(market) = deserialize_perp_market(&update.data) {
        let signal = analyze_market(&market);
        
        if signal.should_trade() {
            execute_trade(signal).await;
        }
    }
};

Advanced Features

Subscribe to Multiple Account Types

// Markets
client.subscribe_markets_with_callback(
    &perp_markets, 
    market_callback
).await?;

// Oracles
client.subscribe_all_oracles_with_callback(
    oracle_callback
).await?;

// Specific user accounts
client.subscribe_account_with_callback(
    &user_account,
    user_callback
).await?;

Error Handling in Callbacks

let callback = |update: &AccountUpdate| {
    match deserialize_perp_market(&update.data) {
        Ok(market) => {
            process_market(&market);
        }
        Err(e) => {
            eprintln!("Deserialization error for {}: {}", update.pubkey, e);
            // Log error, send to monitoring system, etc.
        }
    }
};

Rate Limiting

use std::time::{Duration, Instant};

let mut last_log = Instant::now();
let callback = move |update: &AccountUpdate| {
    if last_log.elapsed() > Duration::from_secs(1) {
        if let Ok(market) = deserialize_perp_market(&update.data) {
            println!("Market update: {:?}", market.market_index);
            last_log = Instant::now();
        }
    }
};

Best Practices

  1. Keep Callbacks Fast: Don’t block the callback with slow operations
  2. Handle Errors: Always handle deserialization errors
  3. Use Channels: Send data to separate tasks for processing
  4. Cleanup: Always unsubscribe when done
  5. Monitor Memory: Be careful with state in callbacks
  6. Rate Limit: Prevent spam from high-frequency updates

Performance Tips

  1. Minimize Work in Callback: Do heavy processing asynchronously
  2. Use Arc/Mutex Carefully: Avoid contention
  3. Buffer Updates: Batch process multiple updates
  4. Filter Early: Only deserialize accounts you need

Troubleshooting

No Updates Received:
  • Check RPC URL supports WebSocket
  • Verify network connectivity
  • Ensure markets actually have activity
High CPU Usage:
  • Reduce number of subscribed markets
  • Optimize callback logic
  • Use rate limiting
Deserialization Errors:
  • Verify account type matches
  • Check for program upgrades
  • Ensure data is complete

Next Steps

  • Build a real-time market dashboard
  • Implement trading signals from market data
  • Create alerting systems
  • Collect historical market data

Build docs developers (and LLMs) love