Skip to main content
The Rust client provides a fully-featured, type-safe interface to Yellowstone gRPC with async/await support powered by Tokio and Tonic.

Installation

Add the client to your Cargo.toml:
[dependencies]
yellowstone-grpc-client = "2.0.0"
yellowstone-grpc-proto = "2.0.0"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
tonic = { version = "0.12", features = ["tls", "tls-roots"] }

Quick Start

Basic Connection

Connect to a Yellowstone gRPC server:
use yellowstone_grpc_client::GeyserGrpcClient;
use tonic::transport::ClientTlsConfig;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut client = GeyserGrpcClient::build_from_shared("https://api.rpcpool.com:443")?
        .x_token(Some("your-token-here"))?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;

    // Use the client
    let version = client.get_version().await?;
    println!("Server version: {:?}", version);

    Ok(())
}

Connecting via Unix Domain Socket

For local connections, use Unix domain sockets for better performance:
let mut client = GeyserGrpcClient::build_from_shared("http://localhost:10000")?
    .connect_uds("/tmp/yellowstone.sock")
    .await?;

Client Builder

The GeyserGrpcBuilder provides extensive configuration options:

Creating a Builder

use yellowstone_grpc_client::GeyserGrpcClient;

// From a shared string
let builder = GeyserGrpcClient::build_from_shared("http://127.0.0.1:10000")?;

// From a static string
let builder = GeyserGrpcClient::build_from_static("http://127.0.0.1:10000");

Authentication

let builder = builder.x_token(Some("your-token"))?;

TLS Configuration

use tonic::transport::ClientTlsConfig;

// Use system certificates
let builder = builder.tls_config(ClientTlsConfig::new().with_native_roots())?;

// Use custom CA certificate
use tonic::transport::Certificate;
use tokio::fs;

let cert_pem = fs::read("ca-cert.pem").await?;
let cert = Certificate::from_pem(cert_pem);
let tls_config = ClientTlsConfig::new()
    .with_native_roots()
    .ca_certificate(cert);
let builder = builder.tls_config(tls_config)?;

Compression

use tonic::codec::CompressionEncoding;

let builder = builder
    .accept_compressed(CompressionEncoding::Gzip)
    .send_compressed(CompressionEncoding::Gzip);

Message Size Limits

let builder = builder
    .max_decoding_message_size(1024 * 1024 * 1024) // 1 GiB
    .max_encoding_message_size(1024 * 1024);       // 1 MiB

Connection Options

use std::time::Duration;

let builder = builder
    .connect_timeout(Duration::from_secs(10))
    .timeout(Duration::from_secs(30))
    .tcp_nodelay(true)
    .tcp_keepalive(Some(Duration::from_secs(60)))
    .http2_keep_alive_interval(Duration::from_secs(10))
    .keep_alive_timeout(Duration::from_secs(20))
    .keep_alive_while_idle(true);

Streaming Subscriptions

Subscribe Once

For one-way streaming where you don’t need to update the subscription:
use yellowstone_grpc_proto::prelude::{
    SubscribeRequest, SubscribeRequestFilterSlots, CommitmentLevel,
};
use futures::stream::StreamExt;
use std::collections::HashMap;

let request = SubscribeRequest {
    slots: HashMap::from([(
        "client".to_string(),
        SubscribeRequestFilterSlots {
            filter_by_commitment: Some(true),
            interslot_updates: Some(false),
        },
    )]),
    commitment: Some(CommitmentLevel::Processed as i32),
    ..Default::default()
};

let mut stream = client.subscribe_once(request).await?;

while let Some(message) = stream.next().await {
    match message {
        Ok(update) => {
            println!("Update: {:?}", update);
        }
        Err(error) => {
            eprintln!("Stream error: {:?}", error);
            break;
        }
    }
}

Bidirectional Streaming

For dynamic subscriptions where you need to update filters:
use futures::sink::SinkExt;
use yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof;

let (mut subscribe_tx, mut stream) = client.subscribe().await?;

// Send initial subscription
subscribe_tx.send(SubscribeRequest {
    slots: HashMap::from([(
        "client".to_string(),
        SubscribeRequestFilterSlots::default(),
    )]),
    commitment: Some(CommitmentLevel::Processed as i32),
    ..Default::default()
}).await?;

// Process updates
while let Some(message) = stream.next().await {
    match message {
        Ok(update) => {
            match update.update_oneof {
                Some(UpdateOneof::Slot(slot_update)) => {
                    println!("Slot: {}", slot_update.slot);
                }
                Some(UpdateOneof::Account(account_update)) => {
                    println!("Account update at slot: {}", account_update.slot);
                }
                _ => {}
            }
        }
        Err(error) => break,
    }
}

Account Subscriptions

use yellowstone_grpc_proto::prelude::{
    SubscribeRequestFilterAccounts,
    SubscribeRequestFilterAccountsFilter,
    subscribe_request_filter_accounts_filter::Filter,
};

let request = SubscribeRequest {
    accounts: HashMap::from([(
        "client".to_string(),
        SubscribeRequestFilterAccounts {
            account: vec![
                "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(),
            ],
            owner: vec![
                "11111111111111111111111111111111".to_string(),
            ],
            filters: vec![
                SubscribeRequestFilterAccountsFilter {
                    filter: Some(Filter::Datasize(165)),
                },
            ],
            nonempty_txn_signature: Some(true),
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
};

let mut stream = client.subscribe_once(request).await?;

Transaction Subscriptions

use yellowstone_grpc_proto::prelude::SubscribeRequestFilterTransactions;

let request = SubscribeRequest {
    transactions: HashMap::from([(
        "client".to_string(),
        SubscribeRequestFilterTransactions {
            vote: Some(false),
            failed: Some(false),
            account_include: vec![
                "11111111111111111111111111111111".to_string(),
            ],
            account_exclude: vec![],
            account_required: vec![],
            signature: None,
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
};

let mut stream = client.subscribe_once(request).await?;

Block Subscriptions

use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocks;

let request = SubscribeRequest {
    blocks: HashMap::from([(
        "client".to_string(),
        SubscribeRequestFilterBlocks {
            account_include: vec![],
            include_transactions: Some(true),
            include_accounts: Some(false),
            include_entries: Some(false),
        },
    )]),
    commitment: Some(CommitmentLevel::Confirmed as i32),
    ..Default::default()
};

let mut stream = client.subscribe_once(request).await?;

Unary RPC Methods

Health Check

let response = client.health_check().await?;
println!("Health status: {:?}", response.status);

Get Version

let response = client.get_version().await?;
println!("Version: {}", response.version);

Get Slot

use yellowstone_grpc_proto::prelude::CommitmentLevel;

let response = client.get_slot(Some(CommitmentLevel::Finalized)).await?;
println!("Current slot: {}", response.slot);

Get Block Height

let response = client.get_block_height(Some(CommitmentLevel::Finalized)).await?;
println!("Block height: {}", response.block_height);

Get Latest Blockhash

let response = client.get_latest_blockhash(Some(CommitmentLevel::Finalized)).await?;
println!("Blockhash: {}", response.blockhash);
println!("Last valid block height: {}", response.last_valid_block_height);

Is Blockhash Valid

let blockhash = "EkSnNWid2cvwEVnVx9aBqawnmiCNiDgp3gUdkDPTKN1N".to_string();
let response = client.is_blockhash_valid(
    blockhash,
    Some(CommitmentLevel::Finalized)
).await?;
println!("Is valid: {}", response.valid);

Ping

let response = client.ping(1).await?;
println!("Pong count: {}", response.count);

Subscribe Replay Info

let response = client.subscribe_replay_info().await?;
if let Some(first_slot) = response.first_available {
    println!("First available slot for replay: {}", first_slot);
}

Advanced: Ping/Pong for Keep-Alive

Some load balancers require periodic client pings:
use yellowstone_grpc_proto::prelude::{
    SubscribeRequestPing, subscribe_update::UpdateOneof,
};

let (mut subscribe_tx, mut stream) = client.subscribe().await?;

// Send initial subscription
subscribe_tx.send(request).await?;

while let Some(message) = stream.next().await {
    match message?.update_oneof {
        Some(UpdateOneof::Ping(_)) => {
            // Respond to server ping
            subscribe_tx.send(SubscribeRequest {
                ping: Some(SubscribeRequestPing { id: 1 }),
                ..Default::default()
            }).await?;
        }
        Some(UpdateOneof::Pong(_)) => {
            // Server acknowledged our ping
        }
        Some(update) => {
            // Process other updates
        }
        None => break,
    }
}

Error Handling

The client uses GeyserGrpcClientError for errors:
use yellowstone_grpc_client::GeyserGrpcClientError;

match client.get_version().await {
    Ok(response) => println!("Version: {:?}", response),
    Err(GeyserGrpcClientError::TonicStatus(status)) => {
        eprintln!("gRPC error: {} - {}", status.code(), status.message());
    }
    Err(GeyserGrpcClientError::SubscribeSendError(err)) => {
        eprintln!("Failed to send subscription: {}", err);
    }
}

Complete Example

Here’s a complete example subscribing to slots with reconnection logic:
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::prelude::{
    subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
    SubscribeRequestFilterSlots, SubscribeRequestPing,
};
use futures::{sink::SinkExt, stream::StreamExt};
use std::collections::HashMap;
use tonic::transport::ClientTlsConfig;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut client = GeyserGrpcClient::build_from_shared("http://127.0.0.1:10000")?
        .x_token(None::<String>)?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;

    let (mut subscribe_tx, mut stream) = client.subscribe().await?;

    // Send initial subscription
    subscribe_tx.send(SubscribeRequest {
        slots: HashMap::from([(
            "client".to_string(),
            SubscribeRequestFilterSlots {
                filter_by_commitment: Some(true),
                interslot_updates: Some(false),
            },
        )]),
        commitment: Some(CommitmentLevel::Processed as i32),
        ..Default::default()
    }).await?;

    println!("Subscribed to slots");

    while let Some(message) = stream.next().await {
        match message {
            Ok(update) => {
                match update.update_oneof {
                    Some(UpdateOneof::Slot(slot)) => {
                        println!("Slot: {} (parent: {})", slot.slot, slot.parent.unwrap_or(0));
                    }
                    Some(UpdateOneof::Ping(_)) => {
                        subscribe_tx.send(SubscribeRequest {
                            ping: Some(SubscribeRequestPing { id: 1 }),
                            ..Default::default()
                        }).await?;
                    }
                    _ => {}
                }
            }
            Err(error) => {
                eprintln!("Stream error: {:?}", error);
                break;
            }
        }
    }

    Ok(())
}

API Reference

GeyserGrpcClient Methods

MethodDescription
health_check()Check if the server is healthy
health_watch()Stream health status updates
subscribe()Create bidirectional subscription stream
subscribe_with_request(req)Subscribe with initial request
subscribe_once(req)One-way subscription stream
ping(count)Send ping to server
get_version()Get server version
get_slot(commitment)Get current slot
get_block_height(commitment)Get current block height
get_latest_blockhash(commitment)Get latest blockhash
is_blockhash_valid(hash, commitment)Check blockhash validity

GeyserGrpcBuilder Methods

MethodDescription
x_token(token)Set authentication token
tls_config(config)Configure TLS
connect_timeout(duration)Set connection timeout
timeout(duration)Set request timeout
tcp_nodelay(enabled)Enable/disable TCP_NODELAY
tcp_keepalive(duration)Set TCP keepalive
send_compressed(encoding)Enable compression for sends
accept_compressed(encoding)Enable compression for receives
max_decoding_message_size(limit)Set max message size for decoding
max_encoding_message_size(limit)Set max message size for encoding
connect()Connect to server
connect_lazy()Create client without connecting
connect_uds(path)Connect via Unix domain socket

Next Steps

Plugin Configuration

Learn how to configure and run the plugin

Subscribe Filters

Deep dive into subscription filters

Build docs developers (and LLMs) love