Skip to main content
Substrait is a cross-language specification for data compute operations. Arrow’s Substrait integration enables serializing and deserializing query plans for interoperability between different query engines and data processing systems.

Overview

Arrow’s Substrait support provides:
  • Plan serialization: Convert Acero execution plans to Substrait format
  • Plan deserialization: Execute Substrait plans using Acero engine
  • Expression conversion: Translate between Arrow compute expressions and Substrait
  • Type mapping: Bidirectional conversion between Arrow and Substrait types
  • Cross-system interoperability: Exchange query plans between systems

Key Concepts

Substrait Plans

A Substrait Plan represents a complete query execution plan with:
  • Relations: Operations like scans, filters, projections, aggregations
  • Expressions: Computations on data (literals, field references, functions)
  • Types: Schema information for data flowing through the plan
  • Extensions: Custom functions and types via extension URIs

Arrow Acero Integration

Arrow executes Substrait plans using the Acero query engine:
  • Substrait relations map to Acero execution nodes
  • Substrait expressions map to Arrow compute expressions
  • Plans execute using Arrow’s streaming execution model

Deserializing Substrait Plans

Basic Plan Execution

Execute a Substrait plan using Arrow:
#include "arrow/engine/api.h"
#include "arrow/compute/api.h"
#include "arrow/acero/api.h"

namespace eng = arrow::engine;
namespace ac = arrow::acero;

// Consumer that processes output batches
class MyConsumer : public ac::SinkNodeConsumer {
 public:
  arrow::Status Init(
      const std::shared_ptr<arrow::Schema>& schema,
      ac::BackpressureControl* backpressure_control,
      ac::ExecPlan* plan) override {
    schema_ = schema;
    return arrow::Status::OK();
  }

  arrow::Status Consume(arrow::compute::ExecBatch batch) override {
    std::cout << "Received " << batch.length << " rows" << std::endl;
    batches_.push_back(batch);
    return arrow::Status::OK();
  }

  arrow::Future<> Finish() override {
    std::cout << "Finished: " << batches_.size() 
              << " batches" << std::endl;
    return arrow::Future<>::MakeFinished();
  }

 private:
  std::shared_ptr<arrow::Schema> schema_;
  std::vector<arrow::compute::ExecBatch> batches_;
};

// Load and execute Substrait plan
arrow::Status ExecutePlan(const std::shared_ptr<arrow::Buffer>& plan_buffer) {
  // Create consumer for output
  auto consumer = std::make_shared<MyConsumer>();
  
  // Deserialize plan to Acero execution plan
  ARROW_ASSIGN_OR_RAISE(
      std::shared_ptr<ac::ExecPlan> exec_plan,
      eng::DeserializePlan(*plan_buffer, consumer));
  
  // Execute the plan
  ARROW_RETURN_NOT_OK(exec_plan->Validate());
  ARROW_RETURN_NOT_OK(exec_plan->StartProducing());
  
  // Wait for completion
  auto finished = exec_plan->finished();
  return finished.status();
}

Multiple Consumer Factory

Handle plans with multiple output relations:
std::vector<std::shared_ptr<ac::SinkNodeConsumer>> consumers;

// Factory creates consumer for each relation
auto consumer_factory = [&]() -> std::shared_ptr<ac::SinkNodeConsumer> {
  auto tag = consumers.size();
  auto consumer = std::make_shared<MyConsumer>(tag);
  consumers.push_back(consumer);
  return consumer;
};

// Deserialize plan with multiple relations
ARROW_ASSIGN_OR_RAISE(
    std::vector<ac::Declaration> declarations,
    eng::DeserializePlans(*plan_buffer, consumer_factory));

std::cout << "Plan has " << declarations.size() 
          << " output relations" << std::endl;

Writing to Filesystem

Write plan output directly to files:
#include "arrow/dataset/api.h"

namespace ds = arrow::dataset;

// Factory for write options
auto write_options_factory = []() {
  auto options = std::make_shared<ds::WriteNodeOptions>();
  
  // Configure output format
  options->format = std::make_shared<ds::ParquetFileFormat>();
  
  // Set output directory
  options->base_dir = "/path/to/output";
  
  // Partitioning scheme (optional)
  options->partitioning = 
      std::make_shared<ds::HivePartitioning>(
          arrow::schema({
              arrow::field("year", arrow::int32()),
              arrow::field("month", arrow::int32())
          }));
  
  return options;
};

// Deserialize and execute plan writing to filesystem
ARROW_ASSIGN_OR_RAISE(
    std::vector<ac::Declaration> declarations,
    eng::DeserializePlans(*plan_buffer, write_options_factory));

Serializing to Substrait

Serialize Acero Plan

Convert an Acero execution plan to Substrait:
// Build Acero plan
auto table_source = ac::Declaration(
    "table_source",
    ac::TableSourceNodeOptions{table});

auto filter = ac::Declaration(
    "filter",
    {table_source},
    ac::FilterNodeOptions{arrow::compute::greater(
        arrow::compute::field_ref("value"),
        arrow::compute::literal(100))});

auto project = ac::Declaration(
    "project",
    {filter},
    ac::ProjectNodeOptions{{arrow::compute::field_ref("name")}});

// Initialize extension set
eng::ExtensionSet ext_set;

// Serialize to Substrait
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized,
    eng::SerializePlan(project, &ext_set));

// Save or transmit serialized plan
std::ofstream out("plan.substrait", std::ios::binary);
out.write(reinterpret_cast<const char*>(serialized->data()),
          serialized->size());

Serialize Expressions

Serialize standalone expressions:
// Create bound expressions
arrow::compute::Expression expr = arrow::compute::call(
    "add",
    {arrow::compute::field_ref("a"),
     arrow::compute::field_ref("b")});

auto schema = arrow::schema({
    arrow::field("a", arrow::int32()),
    arrow::field("b", arrow::int32())
});

eng::BoundExpressions bound_exprs{
    {expr},
    schema
};

// Serialize expressions
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized,
    eng::SerializeExpressions(bound_exprs));

Deserialize Expressions

// Deserialize expressions
ARROW_ASSIGN_OR_RAISE(
    eng::BoundExpressions bound_exprs,
    eng::DeserializeExpressions(*serialized));

std::cout << "Schema: " << bound_exprs.schema->ToString() << std::endl;
std::cout << "Expressions: " << bound_exprs.expressions.size() << std::endl;

for (const auto& expr : bound_exprs.expressions) {
  std::cout << "  " << expr.ToString() << std::endl;
}

Type Conversion

Serialize Arrow Types

// Serialize Arrow type to Substrait
auto arrow_type = arrow::struct_({
    arrow::field("x", arrow::float64()),
    arrow::field("y", arrow::float64())
});

eng::ExtensionSet ext_set;
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized,
    eng::SerializeType(*arrow_type, &ext_set));

Deserialize Substrait Types

// Deserialize Substrait type to Arrow
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::DataType> arrow_type,
    eng::DeserializeType(*serialized, ext_set));

std::cout << "Type: " << arrow_type->ToString() << std::endl;

Schema Conversion

Serialize Schema

auto schema = arrow::schema({
    arrow::field("id", arrow::int64()),
    arrow::field("name", arrow::utf8()),
    arrow::field("value", arrow::float64())
});

eng::ExtensionSet ext_set;
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized,
    eng::SerializeSchema(*schema, &ext_set));

Deserialize Schema

ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Schema> schema,
    eng::DeserializeSchema(*serialized, ext_set));

for (const auto& field : schema->fields()) {
  std::cout << field->name() << ": " 
            << field->type()->ToString() << std::endl;
}

Extension Sets

Extension sets manage custom functions and types:
// Create extension set
eng::ExtensionSet ext_set;

// Use during serialization
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized,
    eng::SerializePlan(declaration, &ext_set));

// Extensions are tracked in ext_set
std::cout << "Functions used: " 
          << ext_set.num_functions() << std::endl;
std::cout << "Types used: " 
          << ext_set.num_types() << std::endl;

// Reuse ext_set for consistent anchors
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> serialized2,
    eng::SerializePlan(declaration2, &ext_set));

Conversion Options

Control conversion behavior:
eng::ConversionOptions options;

// Strictness level
options.strictness = eng::ConversionStrictness::kBestEffort;

// Named table provider
options.named_table_provider = [](const std::vector<std::string>& names) {
  // Return table for given name path
  return LoadTable(names);
};

// Use options in deserialization
ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<ac::ExecPlan> plan,
    eng::DeserializePlan(*buffer, consumer, 
                        /*registry=*/nullptr,
                        /*ext_set_out=*/nullptr,
                        options));

JSON Format

Convert between binary and JSON formats:
// Binary to JSON
ARROW_ASSIGN_OR_RAISE(
    std::string json,
    eng::internal::SubstraitToJSON("Plan", *binary_buffer));

std::cout << "Plan as JSON:\n" << json << std::endl;

// JSON to binary
std::string json_plan = R"({
  "relations": [
    {"rel": {
      "read": {
        "base_schema": {
          "names": ["a", "b"],
          "struct": {
            "types": [{"i32": {}}, {"i32": {}}]
          }
        },
        "local_files": {
          "items": [{"uri_file": "data.parquet", "parquet": {}}]
        }
      }
    }}
  ]
})";

ARROW_ASSIGN_OR_RAISE(
    std::shared_ptr<arrow::Buffer> binary,
    eng::internal::SubstraitFromJSON("Plan", json_plan));

Complete Example

Full workflow from serialization to execution:
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/engine/api.h>
#include <arrow/acero/api.h>

// Producer: Serialize plan
arrow::Status SerializePlan(
    const arrow::acero::Declaration& declaration,
    const std::string& output_path) {
  
  arrow::engine::ExtensionSet ext_set;
  
  ARROW_ASSIGN_OR_RAISE(
      std::shared_ptr<arrow::Buffer> serialized,
      arrow::engine::SerializePlan(declaration, &ext_set));
  
  std::ofstream out(output_path, std::ios::binary);
  out.write(reinterpret_cast<const char*>(serialized->data()),
            serialized->size());
  
  std::cout << "Serialized plan to " << output_path << std::endl;
  return arrow::Status::OK();
}

// Consumer: Execute plan
arrow::Status ExecutePlan(const std::string& plan_path) {
  // Read serialized plan
  ARROW_ASSIGN_OR_RAISE(
      std::shared_ptr<arrow::io::ReadableFile> file,
      arrow::io::ReadableFile::Open(plan_path));
  
  ARROW_ASSIGN_OR_RAISE(
      std::shared_ptr<arrow::Buffer> buffer,
      file->Read());
  
  // Create consumer
  auto consumer = std::make_shared<MyConsumer>();
  
  // Deserialize and execute
  ARROW_ASSIGN_OR_RAISE(
      std::shared_ptr<arrow::acero::ExecPlan> plan,
      arrow::engine::DeserializePlan(*buffer, consumer));
  
  ARROW_RETURN_NOT_OK(plan->Validate());
  ARROW_RETURN_NOT_OK(plan->StartProducing());
  
  return plan->finished().status();
}

Supported Operations

Arrow supports these Substrait relation types:
  • Read: Scan data from files or tables
  • Filter: Select rows based on predicates
  • Project: Compute derived columns
  • Aggregate: Group and aggregate data
  • Join: Combine data from multiple sources
  • Sort: Order data
  • Fetch: Limit and offset
  • Set operations: Union, intersect, except

Best Practices

Extension Set Management

// Share extension set across related operations
eng::ExtensionSet ext_set;

// Serialize multiple related plans
for (const auto& decl : declarations) {
  ARROW_ASSIGN_OR_RAISE(
      auto buffer,
      eng::SerializePlan(decl, &ext_set));  // Reuse ext_set
  // Save buffer...
}

Error Handling

auto result = eng::DeserializePlan(*buffer, consumer);
if (!result.ok()) {
  std::cerr << "Failed to deserialize plan: "
            << result.status().ToString() << std::endl;
  
  // Try JSON conversion for debugging
  auto json_result = eng::internal::SubstraitToJSON("Plan", *buffer);
  if (json_result.ok()) {
    std::cerr << "Plan contents:\n" << *json_result << std::endl;
  }
  
  return result.status();
}

Version Compatibility

// Check Substrait version in plan
// Plans include version information in extension URIs
// Be prepared to handle plans from different Substrait versions

eng::ConversionOptions options;
options.strictness = eng::ConversionStrictness::kBestEffort;

// Best effort mode tolerates minor version differences
ARROW_ASSIGN_OR_RAISE(
    auto plan,
    eng::DeserializePlan(*buffer, consumer,
                        /*registry=*/nullptr,
                        /*ext_set_out=*/nullptr,
                        options));

When to Use Substrait

Substrait is beneficial for:
  • Multi-system workflows: Pass queries between different engines
  • Query federation: Distribute query execution across systems
  • Plan optimization: Serialize, optimize externally, then execute
  • Remote execution: Send query plans to remote workers
  • Testing: Validate query plans across implementations
Direct Acero usage may be better for:
  • Single-system execution: No need for serialization overhead
  • Performance-critical: Avoid serialization/deserialization cost
  • Dynamic plans: Plans built and executed immediately
Important Considerations
  • Substrait integration requires Arrow built with ARROW_SUBSTRAIT=ON
  • Not all Acero operations have Substrait equivalents (yet)
  • Extension functions must be available in consuming system
  • File paths in plans must be accessible from execution environment
  • Substrait is evolving - newer features may not be supported
  • Always validate deserialized plans before execution

Build docs developers (and LLMs) love