Skip to main content
Arrow Flight SQL is a protocol for interacting with SQL databases using the Arrow in-memory format and the Flight RPC framework. It provides a standardized way to execute SQL queries, fetch metadata, and transfer query results efficiently.

Overview

Flight SQL extends Arrow Flight RPC with:
  • SQL query execution: Execute queries and prepared statements
  • Metadata retrieval: Browse catalogs, schemas, tables, and columns
  • Prepared statements: Parameterized queries with type safety
  • Transactions: ACID transaction support
  • Bulk ingestion: High-performance data loading
  • Substrait support: Execute logical query plans

Client Usage

#include <arrow/flight/sql/client.h>
#include <arrow/flight/types.h>

using namespace arrow;
using namespace arrow::flight;
using namespace arrow::flight::sql;

// Connect to Flight SQL server
ARROW_ASSIGN_OR_RAISE(auto location,
    Location::ForGrpcTcp("localhost", 31337));
ARROW_ASSIGN_OR_RAISE(auto client,
    FlightClient::Connect(location));

auto sql_client = std::make_unique<FlightSqlClient>(client);

// Execute a query
FlightCallOptions options;
std::string query = "SELECT * FROM customers WHERE age > 30";

ARROW_ASSIGN_OR_RAISE(auto info,
    sql_client->Execute(options, query));

// Fetch results from each endpoint
for (const auto& endpoint : info->endpoints()) {
  ARROW_ASSIGN_OR_RAISE(auto stream,
      sql_client->DoGet(options, endpoint.ticket));
  
  ARROW_ASSIGN_OR_RAISE(auto table, stream->ToTable());
  std::cout << "Retrieved " << table->num_rows() 
            << " rows" << std::endl;
}

Prepared Statements

Prepared statements provide type safety and better performance for repeated queries:
// Create prepared statement
std::string query = "SELECT * FROM orders WHERE customer_id = ? AND amount > ?";

ARROW_ASSIGN_OR_RAISE(auto prepared_statement,
    sql_client->Prepare(options, query));

// Bind parameters
auto param_schema = arrow::schema({
    arrow::field("customer_id", arrow::int64()),
    arrow::field("amount", arrow::float64())
});

arrow::Int64Builder id_builder;
arrow::DoubleBuilder amount_builder;

ARROW_RETURN_NOT_OK(id_builder.Append(12345));
ARROW_RETURN_NOT_OK(amount_builder.Append(100.0));

ARROW_ASSIGN_OR_RAISE(auto id_array, id_builder.Finish());
ARROW_ASSIGN_OR_RAISE(auto amount_array, amount_builder.Finish());

auto param_batch = RecordBatch::Make(
    param_schema, 1, {id_array, amount_array});

ARROW_RETURN_NOT_OK(prepared_statement->SetParameters(param_batch));

// Execute with parameters
ARROW_ASSIGN_OR_RAISE(auto info, prepared_statement->Execute());

// Close when done
ARROW_RETURN_NOT_OK(prepared_statement->Close());

Metadata Operations

// List catalogs
ARROW_ASSIGN_OR_RAISE(auto catalogs_info,
    sql_client->GetCatalogs(options));
ARROW_ASSIGN_OR_RAISE(auto catalogs_stream,
    sql_client->DoGet(options, catalogs_info->endpoints()[0].ticket));
ARROW_ASSIGN_OR_RAISE(auto catalogs_table,
    catalogs_stream->ToTable());

// List schemas
std::string catalog = "production";
std::string schema_pattern = "%";

ARROW_ASSIGN_OR_RAISE(auto schemas_info,
    sql_client->GetDbSchemas(options, &catalog, &schema_pattern));

// List tables
std::string table_pattern = "customer%";
bool include_schema = true;
std::vector<std::string> table_types = {"TABLE", "VIEW"};

ARROW_ASSIGN_OR_RAISE(auto tables_info,
    sql_client->GetTables(options, &catalog, &schema_pattern,
                         &table_pattern, include_schema, &table_types));

ARROW_ASSIGN_OR_RAISE(auto tables_stream,
    sql_client->DoGet(options, tables_info->endpoints()[0].ticket));
ARROW_ASSIGN_OR_RAISE(auto tables_table,
    tables_stream->ToTable());

// Get table schema
TableRef table_ref;
table_ref.catalog = "production";
table_ref.db_schema = "public";
table_ref.table = "customers";

ARROW_ASSIGN_OR_RAISE(auto primary_keys_info,
    sql_client->GetPrimaryKeys(options, table_ref));

Transactions

// Begin transaction
ARROW_ASSIGN_OR_RAISE(auto transaction,
    sql_client->BeginTransaction(options));

// Execute queries within transaction
std::string insert_query = 
    "INSERT INTO orders (customer_id, amount) VALUES (123, 500.00)";

ARROW_ASSIGN_OR_RAISE(auto rows_affected,
    sql_client->ExecuteUpdate(options, insert_query, *transaction));

std::cout << "Inserted " << rows_affected << " rows" << std::endl;

// Create savepoint
ARROW_ASSIGN_OR_RAISE(auto savepoint,
    sql_client->BeginSavepoint(options, *transaction, "sp1"));

// More operations...

// Commit transaction
ARROW_RETURN_NOT_OK(sql_client->Commit(options, *transaction));

// Or rollback
// ARROW_RETURN_NOT_OK(sql_client->Rollback(options, *transaction));

Bulk Ingestion

Flight SQL provides efficient bulk data loading:
// Prepare data for ingestion
auto schema = arrow::schema({
    arrow::field("id", arrow::int64()),
    arrow::field("name", arrow::utf8()),
    arrow::field("value", arrow::float64())
});

// Create record batches
std::vector<std::shared_ptr<RecordBatch>> batches;
// ... populate batches ...

ARROW_ASSIGN_OR_RAISE(auto reader,
    RecordBatchReader::Make(batches, schema));

// Ingest data
TableDefinitionOptions table_def_options;
table_def_options.if_not_exists = 
    TableDefinitionOptions::kCreate;

std::string table = "new_data";
std::optional<std::string> db_schema = "public";
std::optional<std::string> catalog = "production";
bool temporary = false;

ARROW_ASSIGN_OR_RAISE(auto rows_ingested,
    sql_client->ExecuteIngest(
        options, reader, table_def_options, 
        table, db_schema, catalog, temporary));

std::cout << "Ingested " << rows_ingested << " rows" << std::endl;

Update Queries

Execute INSERT, UPDATE, DELETE statements:
// Execute update query
std::string update_query = 
    "UPDATE customers SET status = 'active' WHERE last_login > '2024-01-01'";

ARROW_ASSIGN_OR_RAISE(auto rows_affected,
    sql_client->ExecuteUpdate(options, update_query));

std::cout << "Updated " << rows_affected << " rows" << std::endl;

// Delete query
std::string delete_query = 
    "DELETE FROM logs WHERE created_at < '2023-01-01'";

ARROW_ASSIGN_OR_RAISE(rows_affected,
    sql_client->ExecuteUpdate(options, delete_query));

std::cout << "Deleted " << rows_affected << " rows" << std::endl;

Server Implementation

#include <arrow/flight/sql/server.h>

class MyFlightSqlServer : public FlightSqlServerBase {
 public:
  arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
      const ServerCallContext& context,
      const StatementQuery& command,
      const FlightDescriptor& descriptor) override {
    
    // Parse SQL query from command.query
    // Execute query planning (don't fetch data yet)
    // Return FlightInfo with schema and endpoints
    
    auto schema = arrow::schema({
        arrow::field("result", arrow::int64())
    });
    
    ARROW_ASSIGN_OR_RAISE(auto location,
        Location::ForGrpcTcp("localhost", 31337));
    
    FlightEndpoint endpoint;
    endpoint.ticket.ticket = "query-result-123";
    endpoint.locations.push_back(location);
    
    ARROW_ASSIGN_OR_RAISE(auto info,
        FlightInfo::Make(*schema, descriptor, {endpoint}, -1, -1));
    
    return std::unique_ptr<FlightInfo>(new FlightInfo(std::move(info)));
  }
  
  arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
      const ServerCallContext& context,
      const StatementQueryTicket& command) override {
    
    // Fetch actual data for the ticket
    // Return stream of record batches
    
    // In production: execute query and stream results
    std::vector<std::shared_ptr<RecordBatch>> batches;
    // ... populate batches from database ...
    
    auto schema = batches[0]->schema();
    ARROW_ASSIGN_OR_RAISE(auto reader,
        RecordBatchReader::Make(batches, schema));
    
    return std::unique_ptr<FlightDataStream>(
        new RecordBatchStream(reader));
  }
};

Build docs developers (and LLMs) love