Skip to main content
Acero is Arrow’s query engine that enables the execution of complex analytical queries on Arrow data. It provides a declarative API for building and executing query plans.

Core Concepts

Execution Plans

An execution plan is a directed acyclic graph (DAG) of nodes that process data:
  • Source nodes: Generate data (e.g., from tables, files, datasets)
  • Transform nodes: Modify data (e.g., filter, project, aggregate)
  • Sink nodes: Consume data (e.g., collect results, write to storage)

Declarations

Declarations describe execution nodes without actually constructing them:
#include <arrow/acero/api.h>
#include <arrow/acero/exec_plan.h>

using namespace arrow::acero;

// Create a simple scan -> filter -> project pipeline
auto table = arrow::TableFromJSON(
    arrow::schema({{"a", arrow::int32()}, {"b", arrow::int32()}}),
    {R"([{"a": 1, "b": 10}, {"a": 2, "b": 20}, {"a": 3, "b": 30}])"}
);

// Build declaration sequence
auto declarations = Declaration::Sequence({
    {"table_source", TableSourceNodeOptions(table)},
    {"filter", FilterNodeOptions(arrow::compute::greater(
        arrow::compute::field_ref("a"), 
        arrow::compute::literal(1)
    ))},
    {"project", ProjectNodeOptions(
        {arrow::compute::field_ref("a"),
         arrow::compute::field_ref("b")},
        {"col_a", "col_b"}
    )}
});

// Execute and collect results
auto result_table = DeclarationToTable(declarations);

Building Query Plans

Filtering Data

#include <arrow/acero/exec_plan.h>

using namespace arrow::acero;

// Create execution plan
auto plan = ExecPlan::Make().ValueOrDie();

// Add source node
auto source = MakeExecNode(
    "table_source",
    plan.get(),
    {},
    TableSourceNodeOptions(table)
).ValueOrDie();

// Add filter node
auto filter = MakeExecNode(
    "filter",
    plan.get(),
    {source},
    FilterNodeOptions(
        arrow::compute::and_(
            arrow::compute::greater(arrow::compute::field_ref("age"), 
                                   arrow::compute::literal(25)),
            arrow::compute::equal(arrow::compute::field_ref("city"), 
                                 arrow::compute::literal("NYC"))
        )
    )
).ValueOrDie();

// Start execution
plan->StartProducing();

// Wait for completion
auto finished = plan->finished();
finished.Wait();

Aggregation

#include <arrow/acero/options.h>
#include <arrow/compute/api_aggregate.h>

using namespace arrow::acero;

// Group by and aggregate
std::vector<arrow::compute::Aggregate> aggregates = {
    {"sum", nullptr, "amount", "total_amount"},
    {"mean", nullptr, "price", "avg_price"},
    {"count", nullptr, "id", "count"}
};

auto aggregate_decl = Declaration::Sequence({
    {"table_source", TableSourceNodeOptions(table)},
    {"aggregate", AggregateNodeOptions(
        aggregates,
        /*keys=*/{"category", "region"}
    )}
});

auto result = DeclarationToTable(aggregate_decl);

Joins

#include <arrow/acero/options.h>

using namespace arrow::acero;

auto left_table = /* ... */;
auto right_table = /* ... */;

// Hash join on key columns
auto join_decl = Declaration::Sequence({
    {"table_source", TableSourceNodeOptions(left_table)},
});

auto right_source = Declaration("table_source", 
                               TableSourceNodeOptions(right_table));

auto join = Declaration("hash_join", 
                       {join_decl, right_source},
                       HashJoinNodeOptions(
                           JoinType::INNER,
                           /*left_keys=*/{"id"},
                           /*right_keys=*/{"user_id"},
                           /*left_output=*/{{"id"}, {"name"}},
                           /*right_output=*/{{"amount"}}
                       ));

auto result = DeclarationToTable(join);

Sorting

#include <arrow/acero/options.h>
#include <arrow/compute/api_vector.h>

using namespace arrow::acero;

// Sort by multiple columns
arrow::compute::SortOptions sort_opts({
    arrow::compute::SortKey("price", arrow::compute::SortOrder::Descending),
    arrow::compute::SortKey("date", arrow::compute::SortOrder::Ascending)
});

auto sort_decl = Declaration::Sequence({
    {"table_source", TableSourceNodeOptions(table)},
    {"order_by", OrderByNodeOptions(sort_opts)}
});

auto result = DeclarationToTable(sort_decl);

Execution Modes

Synchronous Execution

// Execute and wait for completion
auto result = arrow::acero::DeclarationToTable(
    declaration,
    /*use_threads=*/true
).ValueOrDie();

Asynchronous Execution

#include <arrow/util/future.h>

// Execute asynchronously
arrow::Future<std::shared_ptr<arrow::Table>> future = 
    arrow::acero::DeclarationToTableAsync(
        declaration,
        /*use_threads=*/true
    );

// Wait for result
auto result = future.result();

Streaming Results

// Get results as a RecordBatchReader
auto reader = arrow::acero::DeclarationToReader(
    declaration
).ValueOrDie();

// Process batches as they arrive
while (true) {
    auto batch = reader->Next().ValueOrDie();
    if (!batch) break;
    // Process batch
}

Query Options

Customize query execution with QueryOptions:
#include <arrow/acero/exec_plan.h>

using namespace arrow::acero;

QueryOptions query_opts;
query_opts.use_threads = true;
query_opts.memory_pool = arrow::default_memory_pool();
query_opts.use_legacy_batching = false;

auto result = DeclarationToTable(declaration, query_opts);

Performance Optimization

  1. Use streaming execution for large datasets to avoid loading everything in memory
  2. Enable multi-threading with use_threads=true for parallel processing
  3. Push down filters early in the pipeline to reduce data volume
  4. Batch operations to amortize overhead
  5. Use appropriate join algorithms based on data size and distribution

Next Steps

Build docs developers (and LLMs) love