Acero (pronounced ah-SERR-oh) is Arrow’s streaming execution engine for building and executing complex query plans. It processes data as a graph of operators, enabling efficient handling of large datasets that don’t fit in memory.
Acero is experimental and the API may change. However, it’s actively used in production systems and continues to mature.
Overview
Acero executes queries as directed graphs where:
- Nodes represent operations (scan, filter, aggregate, join, etc.)
- Edges represent data flowing between operations
- Data flows as streams of record batches
- Backpressure automatically controls memory usage
┌─────────┐ ┌────────┐ ┌───────────┐
│ Scan │────▶│ Filter │────▶│ Aggregate │
└─────────┘ └────────┘ └───────────┘
│
▼
┌────────┐
│ Sink │
└────────┘
Key Features
- Streaming execution: Process data larger than memory
- Parallel processing: Automatic multi-threaded execution
- Memory efficient: Backpressure prevents memory overflow
- Composable: Build complex plans from simple operators
- Extensible: Add custom nodes and operations
Basic Usage
The typical workflow:
- Create a graph of
Declaration objects
- Execute using a
DeclarationToXyz method
- Process results
Simple Example
#include <arrow/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/dataset.h>
using namespace arrow;
using namespace arrow::acero;
using namespace arrow::compute;
arrow::Result<std::shared_ptr<Table>> SimpleQuery() {
// Initialize compute library
ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
// Create input data
ARROW_ASSIGN_OR_RAISE(auto table, CreateSampleTable());
// Create dataset from table
auto dataset = std::make_shared<dataset::InMemoryDataset>(table);
// Build query plan
auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->dataset_schema = table->schema();
// Scan declaration
Declaration scan{"scan", dataset::ScanNodeOptions{
dataset,
scan_options
}};
// Filter declaration
Expression filter = greater(field_ref("value"), literal(10));
Declaration filter_node{"filter", {scan}, FilterNodeOptions{filter}};
// Execute and get table
ARROW_ASSIGN_OR_RAISE(auto result_table,
DeclarationToTable(filter_node));
return result_table;
}
Creating Plans
Using Declaration
Create nodes by specifying name, inputs, and options:
#include <arrow/acero/exec_plan.h>
// Scan node (source)
Declaration scan{"scan", dataset::ScanNodeOptions{dataset, scan_options}};
// Filter node
Expression filter_expr = greater(field_ref("age"), literal(18));
Declaration filter{"filter", {scan}, FilterNodeOptions{filter_expr}};
// Project node (select columns)
Declaration project{
"project",
{filter},
ProjectNodeOptions{
{field_ref("name"), field_ref("age")},
{"name", "age"}
}
};
Sequential Plans
For linear sequences, use Declaration::Sequence:
// Create linear pipeline
Declaration plan = Declaration::Sequence({
{"scan", dataset::ScanNodeOptions{dataset, scan_options}},
{"filter", FilterNodeOptions{filter_expr}},
{"project", ProjectNodeOptions{exprs, names}}
});
ARROW_ASSIGN_OR_RAISE(auto table, DeclarationToTable(plan));
Execution Methods
DeclarationToTable
DeclarationToReader
DeclarationToStatus
// Collect all results in memory
ARROW_ASSIGN_OR_RAISE(auto table,
DeclarationToTable(plan));
std::cout << "Result: " << table->num_rows()
<< " rows" << std::endl;
Best for: Small to medium result sets that fit in memory// Stream results incrementally
ARROW_ASSIGN_OR_RAISE(auto reader,
DeclarationToReader(plan));
// Process batches as they arrive
std::shared_ptr<RecordBatch> batch;
while (true) {
ARROW_RETURN_NOT_OK(reader->ReadNext(&batch));
if (batch == nullptr) break;
// Process batch
std::cout << "Batch: " << batch->num_rows()
<< " rows" << std::endl;
}
Best for: Large result sets, streaming output// Execute without collecting results
ARROW_RETURN_NOT_OK(DeclarationToStatus(plan));
std::cout << "Plan executed successfully" << std::endl;
Best for: Benchmarking, plans with side effects (e.g., writes)
Common Operators
Source Nodes
Scan Dataset
Table Source
Custom Source
// Read from dataset (Parquet, CSV, etc.)
auto dataset = /* create dataset */;
auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->dataset_schema = dataset->schema();
Declaration scan{"scan", dataset::ScanNodeOptions{
dataset,
scan_options
}};
// Read from in-memory table
std::shared_ptr<Table> table = /* ... */;
Declaration source{"table_source", TableSourceNodeOptions{
table
}};
// Implement custom data source
class MyGenerator : public RecordBatchGenerator {
// Implement Next() to produce batches
};
Declaration source{"source", SourceNodeOptions{
schema,
std::make_shared<MyGenerator>()
}};
Filter
Filter rows using boolean expressions:
// Simple comparison
Expression filter = greater(field_ref("age"), literal(18));
// Multiple conditions with AND
Expression filter = and_(
greater(field_ref("age"), literal(18)),
equal(field_ref("status"), literal("active"))
);
// Multiple conditions with OR
Expression filter = or_(
equal(field_ref("country"), literal("US")),
equal(field_ref("country"), literal("CA"))
);
// Null checks
Expression filter = is_valid(field_ref("email"));
Declaration filter_node{"filter", {input}, FilterNodeOptions{filter}};
Project
Select and compute columns:
// Select specific columns
Declaration project{"project", {input}, ProjectNodeOptions{
{field_ref("id"), field_ref("name")},
{"id", "name"}
}};
// Compute new columns
Declaration project{"project", {input}, ProjectNodeOptions{
{
field_ref("price"),
field_ref("quantity"),
call("multiply", {
field_ref("price"),
field_ref("quantity")
})
},
{"price", "quantity", "total"}
}};
Aggregate
Group and aggregate data:
// Group by with aggregations
Declaration aggregate{"aggregate", {input}, AggregateNodeOptions{
{
{"sum", nullptr, "revenue", "total_revenue"},
{"mean", nullptr, "price", "avg_price"},
{"count", nullptr, "id", "count"}
},
{"category", "region"} // group by columns
}};
// Aggregate without grouping (whole table)
Declaration aggregate{"aggregate", {input}, AggregateNodeOptions{
{
{"sum", nullptr, "value", "total"},
{"max", nullptr, "value", "maximum"}
}
// No group keys = aggregate all rows
}};
Available aggregation functions:
sum, mean, min, max
count, count_distinct
stddev, variance
first, last
any, all
Join
Combine data from multiple sources:
// Hash join
Declaration left = /* left input */;
Declaration right = /* right input */;
Declaration join{"hashjoin", {left, right}, HashJoinNodeOptions{
JoinType::INNER,
{{field_ref("id")}, {field_ref("user_id")}}, // left/right keys
{{field_ref("*")}, {field_ref("*")}}, // left/right projections
literal(true), // filter
"" // output suffix for right
}};
Join types:
JoinType::INNER
JoinType::LEFT_OUTER
JoinType::RIGHT_OUTER
JoinType::FULL_OUTER
JoinType::LEFT_SEMI
JoinType::RIGHT_SEMI
JoinType::LEFT_ANTI
JoinType::RIGHT_ANTI
Order By
Sort results:
Declaration order_by{"order_by", {input}, OrderByNodeOptions{
arrow::compute::Ordering({
{arrow::compute::SortKey("date", arrow::compute::SortOrder::Descending)},
{arrow::compute::SortKey("amount", arrow::compute::SortOrder::Ascending)}
})
}};
Complete Example
Here’s a full example processing sales data:
#include <arrow/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
using namespace arrow;
using namespace arrow::acero;
using namespace arrow::compute;
arrow::Result<std::shared_ptr<Table>> AnalyzeSales(
const std::string& parquet_path) {
// Initialize
ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
// Set up filesystem and dataset
ARROW_ASSIGN_OR_RAISE(auto fs,
fs::FileSystemFromUriOrPath(parquet_path));
auto format = std::make_shared<dataset::ParquetFileFormat>();
dataset::FileSystemFactoryOptions factory_options;
ARROW_ASSIGN_OR_RAISE(auto factory,
dataset::FileSystemDatasetFactory::Make(
fs, {parquet_path}, format, factory_options));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// Build query plan
auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->dataset_schema = dataset->schema();
// 1. Scan Parquet files
Declaration scan{"scan", dataset::ScanNodeOptions{
dataset,
scan_options
}};
// 2. Filter: sales after 2023-01-01 with amount > 100
Expression filter = and_(
greater(field_ref("date"), literal("2023-01-01")),
greater(field_ref("amount"), literal(100.0))
);
Declaration filter_node{"filter", {scan}, FilterNodeOptions{filter}};
// 3. Project: calculate total with tax
Declaration project{"project", {filter_node}, ProjectNodeOptions{
{
field_ref("category"),
field_ref("region"),
field_ref("amount"),
call("multiply", {
field_ref("amount"),
literal(1.1) // 10% tax
})
},
{"category", "region", "amount", "amount_with_tax"}
}};
// 4. Aggregate: sum by category and region
Declaration aggregate{"aggregate", {project}, AggregateNodeOptions{
{
{"sum", nullptr, "amount", "total_sales"},
{"sum", nullptr, "amount_with_tax", "total_with_tax"},
{"count", nullptr, "amount", "transaction_count"}
},
{"category", "region"} // group by
}};
// 5. Order by total sales descending
Declaration order{"order_by", {aggregate}, OrderByNodeOptions{
arrow::compute::Ordering({
{arrow::compute::SortKey(
"total_sales",
arrow::compute::SortOrder::Descending
)}
})
}};
// Execute and return results
ARROW_ASSIGN_OR_RAISE(auto result, DeclarationToTable(order));
return result;
}
Substrait Integration
Acero can consume query plans in Substrait format:
#include <arrow/engine/substrait/api.h>
// Read Substrait plan
std::string substrait_json = /* ... */;
ARROW_ASSIGN_OR_RAISE(auto buf,
arrow::Buffer::FromString(substrait_json));
ARROW_ASSIGN_OR_RAISE(auto reader,
arrow::engine::BoundExpressions::Make(
schema,
arrow::Buffer::FromString(substrait_json)
));
// Convert to Acero declaration
ARROW_ASSIGN_OR_RAISE(auto decl,
arrow::engine::DeserializeExecPlan(
*buf,
/* consumer */,
/* extension_registry */
));
// Execute
ARROW_ASSIGN_OR_RAISE(auto table, DeclarationToTable(decl));
Substrait provides a standardized way to represent query plans, enabling interoperability with other engines and tools.
- Filter early: Apply filters before expensive operations
- Project columns: Select only needed columns to reduce data volume
- Use datasets: Dataset API supports predicate pushdown and projection
- Batch size: Larger batches improve throughput but use more memory
- Parallel scan: Parquet datasets automatically parallelize scanning
- Hash joins: Faster than nested loop joins for equi-joins
- Thread pool: Increase thread pool size for CPU-intensive workloads
// Increase I/O thread pool capacity
arrow::io::SetIOThreadPoolCapacity(16);
// Increase CPU thread pool capacity
arrow::SetCpuThreadPoolCapacity(16);
Error Handling
Always check status and handle errors:
auto maybe_table = DeclarationToTable(plan);
if (!maybe_table.ok()) {
std::cerr << "Query failed: "
<< maybe_table.status().ToString()
<< std::endl;
return maybe_table.status();
}
auto table = *maybe_table;
// Use table...
Best Practices
- Initialize compute library: Call
arrow::compute::Initialize() first
- Use DeclarationToReader: For large result sets
- Enable filter pushdown: Use dataset API for file sources
- Profile queries: Use
DeclarationToStatus for benchmarking
- Handle backpressure: Reader automatically applies backpressure
- Test incrementally: Build plans step-by-step
- Reuse plans: Declaration objects can be executed multiple times
For development and debugging, enable extra error context: ARROW_EXTRA_ERROR_CONTEXT=ON