Skip to main content
Acero (pronounced ah-SERR-oh) is Arrow’s streaming execution engine for building and executing complex query plans. It processes data as a graph of operators, enabling efficient handling of large datasets that don’t fit in memory.
Acero is experimental and the API may change. However, it’s actively used in production systems and continues to mature.

Overview

Acero executes queries as directed graphs where:
  • Nodes represent operations (scan, filter, aggregate, join, etc.)
  • Edges represent data flowing between operations
  • Data flows as streams of record batches
  • Backpressure automatically controls memory usage
┌─────────┐     ┌────────┐     ┌───────────┐
│  Scan   │────▶│ Filter │────▶│ Aggregate │
└─────────┘     └────────┘     └───────────┘


                                 ┌────────┐
                                 │  Sink  │
                                 └────────┘

Key Features

  • Streaming execution: Process data larger than memory
  • Parallel processing: Automatic multi-threaded execution
  • Memory efficient: Backpressure prevents memory overflow
  • Composable: Build complex plans from simple operators
  • Extensible: Add custom nodes and operations

Basic Usage

The typical workflow:
  1. Create a graph of Declaration objects
  2. Execute using a DeclarationToXyz method
  3. Process results

Simple Example

#include <arrow/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/dataset.h>

using namespace arrow;
using namespace arrow::acero;
using namespace arrow::compute;

arrow::Result<std::shared_ptr<Table>> SimpleQuery() {
    // Initialize compute library
    ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
    
    // Create input data
    ARROW_ASSIGN_OR_RAISE(auto table, CreateSampleTable());
    
    // Create dataset from table
    auto dataset = std::make_shared<dataset::InMemoryDataset>(table);
    
    // Build query plan
    auto scan_options = std::make_shared<dataset::ScanOptions>();
    scan_options->dataset_schema = table->schema();
    
    // Scan declaration
    Declaration scan{"scan", dataset::ScanNodeOptions{
        dataset,
        scan_options
    }};
    
    // Filter declaration
    Expression filter = greater(field_ref("value"), literal(10));
    Declaration filter_node{"filter", {scan}, FilterNodeOptions{filter}};
    
    // Execute and get table
    ARROW_ASSIGN_OR_RAISE(auto result_table,
        DeclarationToTable(filter_node));
    
    return result_table;
}

Creating Plans

Using Declaration

Create nodes by specifying name, inputs, and options:
#include <arrow/acero/exec_plan.h>

// Scan node (source)
Declaration scan{"scan", dataset::ScanNodeOptions{dataset, scan_options}};

// Filter node
Expression filter_expr = greater(field_ref("age"), literal(18));
Declaration filter{"filter", {scan}, FilterNodeOptions{filter_expr}};

// Project node (select columns)
Declaration project{
    "project",
    {filter},
    ProjectNodeOptions{
        {field_ref("name"), field_ref("age")},
        {"name", "age"}
    }
};

Sequential Plans

For linear sequences, use Declaration::Sequence:
// Create linear pipeline
Declaration plan = Declaration::Sequence({
    {"scan", dataset::ScanNodeOptions{dataset, scan_options}},
    {"filter", FilterNodeOptions{filter_expr}},
    {"project", ProjectNodeOptions{exprs, names}}
});

ARROW_ASSIGN_OR_RAISE(auto table, DeclarationToTable(plan));

Execution Methods

// Collect all results in memory
ARROW_ASSIGN_OR_RAISE(auto table,
    DeclarationToTable(plan));

std::cout << "Result: " << table->num_rows() 
          << " rows" << std::endl;
Best for: Small to medium result sets that fit in memory

Common Operators

Source Nodes

// Read from dataset (Parquet, CSV, etc.)
auto dataset = /* create dataset */;
auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->dataset_schema = dataset->schema();

Declaration scan{"scan", dataset::ScanNodeOptions{
    dataset,
    scan_options
}};

Filter

Filter rows using boolean expressions:
// Simple comparison
Expression filter = greater(field_ref("age"), literal(18));

// Multiple conditions with AND
Expression filter = and_(
    greater(field_ref("age"), literal(18)),
    equal(field_ref("status"), literal("active"))
);

// Multiple conditions with OR
Expression filter = or_(
    equal(field_ref("country"), literal("US")),
    equal(field_ref("country"), literal("CA"))
);

// Null checks
Expression filter = is_valid(field_ref("email"));

Declaration filter_node{"filter", {input}, FilterNodeOptions{filter}};

Project

Select and compute columns:
// Select specific columns
Declaration project{"project", {input}, ProjectNodeOptions{
    {field_ref("id"), field_ref("name")},
    {"id", "name"}
}};

// Compute new columns
Declaration project{"project", {input}, ProjectNodeOptions{
    {
        field_ref("price"),
        field_ref("quantity"),
        call("multiply", {
            field_ref("price"),
            field_ref("quantity")
        })
    },
    {"price", "quantity", "total"}
}};

Aggregate

Group and aggregate data:
// Group by with aggregations
Declaration aggregate{"aggregate", {input}, AggregateNodeOptions{
    {
        {"sum", nullptr, "revenue", "total_revenue"},
        {"mean", nullptr, "price", "avg_price"},
        {"count", nullptr, "id", "count"}
    },
    {"category", "region"}  // group by columns
}};

// Aggregate without grouping (whole table)
Declaration aggregate{"aggregate", {input}, AggregateNodeOptions{
    {
        {"sum", nullptr, "value", "total"},
        {"max", nullptr, "value", "maximum"}
    }
    // No group keys = aggregate all rows
}};
Available aggregation functions:
  • sum, mean, min, max
  • count, count_distinct
  • stddev, variance
  • first, last
  • any, all

Join

Combine data from multiple sources:
// Hash join
Declaration left = /* left input */;
Declaration right = /* right input */;

Declaration join{"hashjoin", {left, right}, HashJoinNodeOptions{
    JoinType::INNER,
    {{field_ref("id")}, {field_ref("user_id")}},  // left/right keys
    {{field_ref("*")}, {field_ref("*")}},         // left/right projections
    literal(true),                                  // filter
    ""                                              // output suffix for right
}};
Join types:
  • JoinType::INNER
  • JoinType::LEFT_OUTER
  • JoinType::RIGHT_OUTER
  • JoinType::FULL_OUTER
  • JoinType::LEFT_SEMI
  • JoinType::RIGHT_SEMI
  • JoinType::LEFT_ANTI
  • JoinType::RIGHT_ANTI

Order By

Sort results:
Declaration order_by{"order_by", {input}, OrderByNodeOptions{
    arrow::compute::Ordering({
        {arrow::compute::SortKey("date", arrow::compute::SortOrder::Descending)},
        {arrow::compute::SortKey("amount", arrow::compute::SortOrder::Ascending)}
    })
}};

Complete Example

Here’s a full example processing sales data:
#include <arrow/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>

using namespace arrow;
using namespace arrow::acero;
using namespace arrow::compute;

arrow::Result<std::shared_ptr<Table>> AnalyzeSales(
    const std::string& parquet_path) {
    
    // Initialize
    ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
    
    // Set up filesystem and dataset
    ARROW_ASSIGN_OR_RAISE(auto fs,
        fs::FileSystemFromUriOrPath(parquet_path));
    
    auto format = std::make_shared<dataset::ParquetFileFormat>();
    
    dataset::FileSystemFactoryOptions factory_options;
    ARROW_ASSIGN_OR_RAISE(auto factory,
        dataset::FileSystemDatasetFactory::Make(
            fs, {parquet_path}, format, factory_options));
    
    ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
    
    // Build query plan
    auto scan_options = std::make_shared<dataset::ScanOptions>();
    scan_options->dataset_schema = dataset->schema();
    
    // 1. Scan Parquet files
    Declaration scan{"scan", dataset::ScanNodeOptions{
        dataset,
        scan_options
    }};
    
    // 2. Filter: sales after 2023-01-01 with amount > 100
    Expression filter = and_(
        greater(field_ref("date"), literal("2023-01-01")),
        greater(field_ref("amount"), literal(100.0))
    );
    Declaration filter_node{"filter", {scan}, FilterNodeOptions{filter}};
    
    // 3. Project: calculate total with tax
    Declaration project{"project", {filter_node}, ProjectNodeOptions{
        {
            field_ref("category"),
            field_ref("region"),
            field_ref("amount"),
            call("multiply", {
                field_ref("amount"),
                literal(1.1)  // 10% tax
            })
        },
        {"category", "region", "amount", "amount_with_tax"}
    }};
    
    // 4. Aggregate: sum by category and region
    Declaration aggregate{"aggregate", {project}, AggregateNodeOptions{
        {
            {"sum", nullptr, "amount", "total_sales"},
            {"sum", nullptr, "amount_with_tax", "total_with_tax"},
            {"count", nullptr, "amount", "transaction_count"}
        },
        {"category", "region"}  // group by
    }};
    
    // 5. Order by total sales descending
    Declaration order{"order_by", {aggregate}, OrderByNodeOptions{
        arrow::compute::Ordering({
            {arrow::compute::SortKey(
                "total_sales", 
                arrow::compute::SortOrder::Descending
            )}
        })
    }};
    
    // Execute and return results
    ARROW_ASSIGN_OR_RAISE(auto result, DeclarationToTable(order));
    return result;
}

Substrait Integration

Acero can consume query plans in Substrait format:
#include <arrow/engine/substrait/api.h>

// Read Substrait plan
std::string substrait_json = /* ... */;

ARROW_ASSIGN_OR_RAISE(auto buf,
    arrow::Buffer::FromString(substrait_json));

ARROW_ASSIGN_OR_RAISE(auto reader,
    arrow::engine::BoundExpressions::Make(
        schema,
        arrow::Buffer::FromString(substrait_json)
    ));

// Convert to Acero declaration
ARROW_ASSIGN_OR_RAISE(auto decl,
    arrow::engine::DeserializeExecPlan(
        *buf,
        /* consumer */,
        /* extension_registry */
    ));

// Execute
ARROW_ASSIGN_OR_RAISE(auto table, DeclarationToTable(decl));
Substrait provides a standardized way to represent query plans, enabling interoperability with other engines and tools.

Performance Tips

  1. Filter early: Apply filters before expensive operations
  2. Project columns: Select only needed columns to reduce data volume
  3. Use datasets: Dataset API supports predicate pushdown and projection
  4. Batch size: Larger batches improve throughput but use more memory
  5. Parallel scan: Parquet datasets automatically parallelize scanning
  6. Hash joins: Faster than nested loop joins for equi-joins
  7. Thread pool: Increase thread pool size for CPU-intensive workloads
// Increase I/O thread pool capacity
arrow::io::SetIOThreadPoolCapacity(16);

// Increase CPU thread pool capacity
arrow::SetCpuThreadPoolCapacity(16);

Error Handling

Always check status and handle errors:
auto maybe_table = DeclarationToTable(plan);
if (!maybe_table.ok()) {
    std::cerr << "Query failed: " 
              << maybe_table.status().ToString() 
              << std::endl;
    return maybe_table.status();
}

auto table = *maybe_table;
// Use table...

Best Practices

  1. Initialize compute library: Call arrow::compute::Initialize() first
  2. Use DeclarationToReader: For large result sets
  3. Enable filter pushdown: Use dataset API for file sources
  4. Profile queries: Use DeclarationToStatus for benchmarking
  5. Handle backpressure: Reader automatically applies backpressure
  6. Test incrementally: Build plans step-by-step
  7. Reuse plans: Declaration objects can be executed multiple times
For development and debugging, enable extra error context: ARROW_EXTRA_ERROR_CONTEXT=ON

Build docs developers (and LLMs) love