Skip to main content
This guide explains how to integrate a new blockchain network into Graph Node. Chain integration involves implementing the Blockchain trait and providing chain-specific adapters for data ingestion and trigger processing.

Overview

Graph Node uses a modular architecture that allows different blockchain networks to be integrated by implementing a set of core traits. Each chain integration lives in the chain/ directory and provides:
  • Data source definitions
  • Trigger types and filtering
  • Block stream implementation
  • Runtime adapters for contract calls
  • Block ingestion from RPC or Firehose

Architecture

The chain integration architecture consists of several key components:
Blockchain → Chain Adapter → Block Stream → Trigger Processing → Runtime → Store
  1. Chain Adapter: Connects to blockchain nodes and converts data
  2. Block Stream: Provides event-driven streaming of blocks
  3. Trigger Processing: Matches blockchain events to subgraph handlers
  4. Runtime: Executes subgraph code in WASM sandbox
  5. Store: Persists entities with block-level granularity

Project Structure

Each chain integration follows this structure:
chain/
├── <chain-name>/
│   ├── src/
│   │   ├── chain.rs          # Main Chain struct and Blockchain trait impl
│   │   ├── adapter.rs        # Triggers adapter and filtering
│   │   ├── data_source.rs    # Data source types
│   │   ├── trigger.rs        # Trigger data types
│   │   ├── codec.rs          # Protocol buffer codecs
│   │   └── runtime/
│   │       ├── mod.rs        # Runtime adapter
│   │       └── abi.rs        # ABI handling
│   ├── Cargo.toml
│   └── build.rs              # Build script for protobufs
└── common/
    └── src/
        └── lib.rs            # Shared utilities

Core Traits

Blockchain Trait

The Blockchain trait is the primary interface that all chains must implement:
use graph::blockchain::{Blockchain, BlockchainKind};
use graph::prelude::*;

#[async_trait]
impl Blockchain for Chain {
    // Unique identifier for this blockchain
    const KIND: BlockchainKind = BlockchainKind::Ethereum;
    
    // Aliases for backward compatibility
    const ALIASES: &'static [&'static str] = &["ethereum/contract"];
    
    // Associated types
    type Client = EthereumNetworkAdapters;
    type Block = BlockFinality;
    type DataSource = DataSource;
    type UnresolvedDataSource = UnresolvedDataSource;
    type DataSourceTemplate = DataSourceTemplate;
    type UnresolvedDataSourceTemplate = UnresolvedDataSourceTemplate;
    type TriggerData = EthereumTrigger;
    type MappingTrigger = MappingTrigger;
    type TriggerFilter = TriggerFilter;
    type NodeCapabilities = NodeCapabilities;
    type DecoderHook = DecoderHook;
    
    // Required methods
    fn triggers_adapter(
        &self,
        loc: &DeploymentLocator,
        capabilities: &Self::NodeCapabilities,
        unified_api_version: UnifiedMappingApiVersion,
    ) -> Result<Arc<dyn TriggersAdapterTrait<Self>>, Error>;
    
    async fn new_block_stream(
        &self,
        deployment: DeploymentLocator,
        store: impl DeploymentCursorTracker,
        start_blocks: Vec<BlockNumber>,
        filter: Arc<TriggerFilterWrapper<Self>>,
        unified_api_version: UnifiedMappingApiVersion,
    ) -> Result<Box<dyn BlockStream<Self>>, Error>;
    
    async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
    
    async fn block_pointer_from_number(
        &self,
        logger: &Logger,
        number: BlockNumber,
    ) -> Result<BlockPtr, IngestorError>;
    
    fn is_refetch_block_required(&self) -> bool;
    
    async fn refetch_firehose_block(
        &self,
        logger: &Logger,
        cursor: FirehoseCursor,
    ) -> Result<Self::Block, Error>;
    
    async fn runtime(
        &self,
    ) -> Result<(Arc<dyn RuntimeAdapterTrait<Self>>, Self::DecoderHook)>;
    
    fn chain_client(&self) -> Arc<ChainClient<Self>>;
    
    async fn block_ingestor(&self) -> Result<Box<dyn BlockIngestor>>;
}

Block Trait

Blocks must implement the Block trait:
impl Block for BlockFinality {
    fn ptr(&self) -> BlockPtr {
        // Return block pointer (hash + number)
    }
    
    fn parent_ptr(&self) -> Option<BlockPtr> {
        // Return parent block pointer
    }
    
    fn data(&self) -> Result<json::Value, json::Error> {
        // Serialize block data
    }
    
    fn timestamp(&self) -> BlockTime {
        // Return block timestamp
    }
}

Implementation Steps

1
Create Chain Module
2
Create a new directory under chain/ for your blockchain:
3
mkdir -p chain/mychain/src
cd chain/mychain
4
Create Cargo.toml:
5
Cargo.toml
[package]
name = "graph-chain-mychain"
version = "0.1.0"
edition = "2021"

[dependencies]
graph = { path = "../../graph" }
anyhow = "1.0"
async-trait = "0.1"
prost = "0.12"
tokio = { version = "1.0", features = ["full"] }

[build-dependencies]
tonic-build = "0.10"
6
Define Data Sources
7
Implement the DataSource type representing a subgraph data source:
8
src/data_source.rs
use graph::blockchain::{self, Blockchain};
use graph::prelude::*;

#[derive(Clone, Debug)]
pub struct DataSource {
    pub kind: String,
    pub network: Option<String>,
    pub name: String,
    pub source: Source,
    pub mapping: Mapping,
    pub context: Arc<Option<DataSourceContext>>,
    pub creation_block: Option<BlockNumber>,
}

impl blockchain::DataSource<Chain> for DataSource {
    fn from_template_info(
        info: InstanceDSTemplateInfo,
        template: &DataSourceTemplate,
    ) -> Result<Self, Error> {
        // Create data source from template
    }
    
    fn address(&self) -> Option<&[u8]> {
        // Return contract address if applicable
    }
    
    fn start_block(&self) -> BlockNumber {
        self.source.start_block
    }
    
    fn end_block(&self) -> Option<BlockNumber> {
        self.source.end_block
    }
    
    fn handler_kinds(&self) -> HashSet<&str> {
        // Return set of handler types (e.g., "event", "call", "block")
    }
    
    fn match_and_decode(
        &self,
        trigger: &<Chain as Blockchain>::TriggerData,
        block: &Arc<<Chain as Blockchain>::Block>,
        logger: &Logger,
    ) -> Result<Option<TriggerWithHandler<Chain>>, Error> {
        // Match trigger against this data source's filters
        // Return handler if matched
    }
}
9
Define Trigger Types
10
Create trigger types representing blockchain events:
11
src/trigger.rs
use graph::blockchain::TriggerData;
use graph::prelude::*;

#[derive(Clone, Debug)]
pub enum MyChainTrigger {
    Block(Arc<Block>),
    Transaction(Arc<Transaction>),
    Event(Arc<Event>),
}

impl TriggerData for MyChainTrigger {
    fn error_context(&self) -> std::string::String {
        match self {
            MyChainTrigger::Block(block) => {
                format!("block #{}", block.number)
            }
            MyChainTrigger::Transaction(tx) => {
                format!("transaction {}", tx.hash)
            }
            MyChainTrigger::Event(event) => {
                format!("event in block #{}", event.block_number)
            }
        }
    }
}
12
Implement Trigger Filter
13
Create a filter for matching triggers:
14
src/adapter.rs
use graph::blockchain::TriggerFilter as TriggerFilterTrait;
use std::collections::HashSet;

#[derive(Clone, Debug, Default)]
pub struct TriggerFilter {
    pub block_filter: BlockFilter,
    pub transaction_filter: TransactionFilter,
    pub event_filter: EventFilter,
}

impl TriggerFilterTrait<Chain> for TriggerFilter {
    fn extend<'a>(&mut self, data_sources: impl Iterator<Item = &'a DataSource>) {
        // Extend filter from data sources
        for ds in data_sources {
            self.block_filter.extend_from_data_source(ds);
            self.transaction_filter.extend_from_data_source(ds);
            self.event_filter.extend_from_data_source(ds);
        }
    }
    
    fn node_capabilities(&self) -> NodeCapabilities {
        // Return required node capabilities
        NodeCapabilities::default()
    }
    
    fn to_firehose_filter(self) -> Vec<prost_types::Any> {
        // Convert to Firehose filter format
        vec![]
    }
}
15
Implement Triggers Adapter
16
The triggers adapter loads blocks and extracts triggers:
17
src/adapter.rs
use graph::blockchain::block_stream::{TriggersAdapter, BlockWithTriggers};

pub struct MyChainTriggersAdapter {
    logger: Logger,
    client: Arc<ChainClient>,
}

#[async_trait]
impl TriggersAdapter<Chain> for MyChainTriggersAdapter {
    async fn scan_triggers(
        &self,
        from: BlockNumber,
        to: BlockNumber,
        filter: &TriggerFilter,
    ) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
        // Scan block range for triggers matching filter
        let blocks = self.load_blocks(from, to).await?;
        
        let blocks_with_triggers = blocks
            .into_iter()
            .map(|block| {
                let triggers = self.extract_triggers(&block, filter);
                BlockWithTriggers::new(block, triggers, &self.logger)
            })
            .collect();
        
        Ok((blocks_with_triggers, to))
    }
    
    async fn triggers_in_block(
        &self,
        logger: &Logger,
        block: Block,
        filter: &TriggerFilter,
    ) -> Result<BlockWithTriggers<Chain>, Error> {
        // Extract triggers from a single block
        let triggers = self.extract_triggers(&block, filter);
        Ok(BlockWithTriggers::new(block, triggers, logger))
    }
    
    async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
        // Check if block is on main chain (reorg detection)
    }
    
    async fn ancestor_block(
        &self,
        ptr: BlockPtr,
        offset: BlockNumber,
        root: Option<BlockHash>,
    ) -> Result<Option<Block>, Error> {
        // Get ancestor block at offset
    }
    
    async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
        // Get parent block pointer
    }
}
18
Implement Block Stream
19
Create a block stream builder:
20
src/chain.rs
use graph::blockchain::block_stream::{BlockStreamBuilder, BlockStream};

pub struct MyChainStreamBuilder {}

#[async_trait]
impl BlockStreamBuilder<Chain> for MyChainStreamBuilder {
    async fn build_firehose(
        &self,
        chain: &Chain,
        deployment: DeploymentLocator,
        block_cursor: FirehoseCursor,
        start_blocks: Vec<BlockNumber>,
        filter: Arc<TriggerFilter>,
        unified_api_version: UnifiedMappingApiVersion,
    ) -> Result<Box<dyn BlockStream<Chain>>> {
        // Build Firehose-based block stream
        let adapter = chain.triggers_adapter(&deployment, &NodeCapabilities::default())?;
        
        let logger = chain.logger_factory
            .subgraph_logger(&deployment)
            .new(o!("component" => "FirehoseBlockStream"));
        
        let mapper = Arc::new(FirehoseMapper { adapter, filter });
        
        Ok(Box::new(FirehoseBlockStream::new(
            deployment.hash,
            chain.chain_client(),
            block_cursor,
            mapper,
            start_blocks,
            logger,
            chain.registry.clone(),
        )))
    }
    
    async fn build_polling(
        &self,
        chain: &Chain,
        deployment: DeploymentLocator,
        start_blocks: Vec<BlockNumber>,
        filter: Arc<TriggerFilterWrapper<Chain>>,
        unified_api_version: UnifiedMappingApiVersion,
    ) -> Result<Box<dyn BlockStream<Chain>>> {
        // Build RPC polling-based block stream
        // Similar to build_firehose but with polling mechanism
    }
}
21
Implement Runtime Adapter
22
Create runtime adapter for contract calls:
23
src/runtime/mod.rs
use graph::blockchain::RuntimeAdapter;
use graph::prelude::*;

pub struct MyChainRuntimeAdapter {
    client: Arc<ChainClient>,
    call_cache: Arc<dyn CallCache>,
}

#[async_trait]
impl RuntimeAdapter<Chain> for MyChainRuntimeAdapter {
    fn host_fns(&self, ds: &DataSource) -> Result<Vec<HostFn>, Error> {
        // Return available host functions for this chain
        // e.g., mychain.call(), mychain.getBalance(), etc.
    }
}
24
Implement Block Ingestor
25
Create block ingestor for chain head tracking:
26
src/chain.rs
impl Chain {
    async fn block_ingestor(&self) -> Result<Box<dyn BlockIngestor>> {
        match self.chain_client().as_ref() {
            ChainClient::Firehose(_) => {
                // Use Firehose block ingestor
                let ingestor = FirehoseBlockIngestor::<Block, Self>::new(
                    self.chain_store.clone(),
                    self.chain_client(),
                    self.logger_factory.component_logger("BlockIngestor", None),
                    self.name.clone(),
                );
                Ok(Box::new(ingestor))
            }
            ChainClient::Rpc(_) => {
                // Use RPC polling ingestor
                let ingestor = PollingBlockIngestor::new(
                    self.logger.clone(),
                    self.reorg_threshold,
                    self.chain_client(),
                    self.chain_store.clone(),
                    self.polling_interval,
                    self.name.clone(),
                )?;
                Ok(Box::new(ingestor))
            }
        }
    }
}
27
Add Protocol Buffers (for Firehose)
28
If using Firehose, define protobuf schemas:
29
proto/mychain.proto
syntax = "proto3";

package sf.mychain.type.v1;

message Block {
  string hash = 1 [(firehose.required) = true];
  uint64 number = 2 [(firehose.required) = true];
  string parent_hash = 3;
  uint64 timestamp = 4;
  repeated Transaction transactions = 5;
}

message Transaction {
  string hash = 1;
  string from = 2;
  string to = 3;
  bytes data = 4;
}
30
Create build.rs:
31
build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::configure()
        .out_dir("src/protobuf")
        .compile(&["proto/mychain.proto"], &["proto"])?;
    Ok(())
}
32
Register Chain
33
Register the new chain in the main node:
34
node/src/chains.rs
use graph_chain_mychain as mychain;

pub fn register_chains(
    blockchain_map: &mut BlockchainMap,
    config: &ChainConfig,
) -> Result<(), Error> {
    // Register MyChain
    if let Some(mychain_config) = config.mychain {
        let chain = mychain::Chain::new(
            logger_factory.clone(),
            name.clone(),
            chain_store.clone(),
            client,
            registry.clone(),
        );
        blockchain_map.insert(BlockchainKind::MyChain, Arc::new(chain));
    }
    
    Ok(())
}

Extension Points

Custom Node Capabilities

Define chain-specific node capabilities:
use graph::blockchain::NodeCapabilities as NodeCapabilitiesTrait;

#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct NodeCapabilities {
    pub archive_mode: bool,
    pub traces: bool,
}

impl NodeCapabilitiesTrait<Chain> for NodeCapabilities {}

impl fmt::Display for NodeCapabilities {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "archive: {}, traces: {}", self.archive_mode, self.traces)
    }
}

Custom Decoder Hooks

Implement decoder hooks for preprocessing data:
pub struct DecoderHook {
    client: Arc<ChainClient>,
}

impl DecoderHook {
    pub async fn decode_transaction(
        &self,
        tx_hash: &str,
    ) -> Result<Transaction, Error> {
        // Custom decoding logic
    }
}

Subgraph Manifest Schema

Define the YAML schema for your chain:
specVersion: 0.0.8
schema:
  file: ./schema.graphql
dataSources:
  - kind: mychain
    name: MyContract
    network: mainnet
    source:
      address: "0x123..."
      startBlock: 1000000
    mapping:
      kind: mychain/events
      apiVersion: 0.0.8
      language: wasm/assemblyscript
      entities:
        - Transfer
      eventHandlers:
        - event: Transfer(address,address,uint256)
          handler: handleTransfer
      file: ./src/mapping.ts

Testing

Unit Tests

Write unit tests for core components:
#[cfg(test)]
mod tests {
    use super::*;
    
    #[test]
    fn test_trigger_filter() {
        let filter = TriggerFilter::default();
        // Test filter logic
    }
    
    #[graph::test]
    async fn test_triggers_adapter() {
        let adapter = MyChainTriggersAdapter::new(...);
        let triggers = adapter.scan_triggers(0, 100, &filter).await.unwrap();
        assert!(!triggers.is_empty());
    }
}

Integration Tests

Create integration tests in tests/integration-tests/:
# Run integration tests for your chain
TEST_CASE=mychain_basic just test-integration

Runner Tests

Create runner tests in store/test-store/tests/chain/mychain/:
# Run runner tests
just test-runner mychain

Examples

Ethereum Implementation

See chain/ethereum/ for a full-featured reference implementation with:
  • RPC and Firehose support
  • Event, call, and block handlers
  • Call caching
  • Reorg handling
  • Trace support

NEAR Implementation

See chain/near/ for a simpler Firehose-only implementation:
  • Receipt-based triggers
  • No dynamic data sources
  • Account filtering (exact and partial matches)

Common Patterns

Handling Reorgs

impl TriggersAdapter<Chain> for MyChainAdapter {
    async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
        // Fetch block at same height from current chain head
        let current = self.client.block_by_number(ptr.number).await?;
        
        // Compare hashes
        Ok(current.hash == ptr.hash)
    }
}

Block Caching

impl TriggersAdapter<Chain> for MyChainAdapter {
    async fn ancestor_block(
        &self,
        ptr: BlockPtr,
        offset: BlockNumber,
    ) -> Result<Option<Block>, Error> {
        // Try cache first
        if let Some(block) = self.chain_store.ancestor_block(ptr, offset).await? {
            return Ok(Some(block));
        }
        
        // Fall back to fetching from node
        self.fetch_ancestor(ptr, offset).await
    }
}

Configuration

Add chain configuration to config.toml:
[chains.mychain]
provider = [
  { label = "mychain-mainnet", url = "https://rpc.mychain.io", features = ["archive", "traces"] }
]

[chains.mychain.firehose]
endpoint = "mychain-firehose.example.com:443"

Best Practices

  • Error Handling: Use Result types and provide context with anyhow
  • Async/Await: Use async_trait for trait implementations
  • Logging: Use structured logging with slog
  • Caching: Cache frequently accessed data (blocks, receipts)
  • Metrics: Add Prometheus metrics for monitoring
  • Testing: Write comprehensive unit and integration tests
  • Documentation: Document chain-specific behavior and limitations

Performance Optimization

Batch Loading

impl TriggersAdapter<Chain> for MyChainAdapter {
    async fn scan_triggers(
        &self,
        from: BlockNumber,
        to: BlockNumber,
        filter: &TriggerFilter,
    ) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
        // Batch load blocks
        let blocks = self.client.load_blocks_batch(from, to).await?;
        
        // Process in parallel
        let results = blocks
            .into_iter()
            .map(|block| self.extract_triggers(&block, filter))
            .collect();
        
        Ok((results, to))
    }
}

Parallel Processing

Use FuturesOrdered for parallel processing:
use futures03::stream::FuturesOrdered;
use futures03::TryStreamExt;

let futures: FuturesOrdered<_> = block_numbers
    .into_iter()
    .map(|num| self.client.load_block(num))
    .collect();

let blocks = futures.try_collect::<Vec<_>>().await?;

Debugging

Enable Debug Logging

RUST_LOG=graph_chain_mychain=debug graph-node ...

Common Issues

Block not found: Ensure block ingestion is running and synced Trigger mismatch: Check filter implementation and data source matching logic Reorg issues: Verify is_on_main_chain and ancestor_block implementations

See Also

Build docs developers (and LLMs) love