Overview
Flight SQL extends Arrow Flight RPC with:- SQL query execution: Execute queries and prepared statements
- Metadata retrieval: Browse catalogs, schemas, tables, and columns
- Prepared statements: Parameterized queries with type safety
- Transactions: ACID transaction support
- Bulk ingestion: High-performance data loading
- Substrait support: Execute logical query plans
Client Usage
- C++
- Python
- Java
- Go
#include <arrow/flight/sql/client.h>
#include <arrow/flight/types.h>
using namespace arrow;
using namespace arrow::flight;
using namespace arrow::flight::sql;
// Connect to Flight SQL server
ARROW_ASSIGN_OR_RAISE(auto location,
Location::ForGrpcTcp("localhost", 31337));
ARROW_ASSIGN_OR_RAISE(auto client,
FlightClient::Connect(location));
auto sql_client = std::make_unique<FlightSqlClient>(client);
// Execute a query
FlightCallOptions options;
std::string query = "SELECT * FROM customers WHERE age > 30";
ARROW_ASSIGN_OR_RAISE(auto info,
sql_client->Execute(options, query));
// Fetch results from each endpoint
for (const auto& endpoint : info->endpoints()) {
ARROW_ASSIGN_OR_RAISE(auto stream,
sql_client->DoGet(options, endpoint.ticket));
ARROW_ASSIGN_OR_RAISE(auto table, stream->ToTable());
std::cout << "Retrieved " << table->num_rows()
<< " rows" << std::endl;
}
from pyarrow import flight
from pyarrow.flight import FlightClient
import pyarrow.flight.sql as flight_sql
# Connect to Flight SQL server
client = FlightClient(("localhost", 31337))
sql_client = flight_sql.FlightSQLClient(client)
# Execute a query
query = "SELECT * FROM customers WHERE age > 30"
info = sql_client.execute(query)
# Fetch results
reader = sql_client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
print(f"Retrieved {len(table)} rows")
print(table.schema)
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.RootAllocator;
// Connect to Flight SQL server
BufferAllocator allocator = new RootAllocator();
FlightClient client = FlightClient.builder()
.allocator(allocator)
.location(Location.forGrpcInsecure("localhost", 31337))
.build();
FlightSqlClient sqlClient = new FlightSqlClient(client);
// Execute a query
String query = "SELECT * FROM customers WHERE age > 30";
FlightInfo info = sqlClient.execute(query);
// Fetch results
Ticket ticket = info.getEndpoints().get(0).getTicket();
FlightStream stream = sqlClient.getStream(ticket);
while (stream.next()) {
System.out.println("Rows: " + stream.getRoot().getRowCount());
}
import (
"context"
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
"google.golang.org/grpc"
)
// Connect to Flight SQL server
client, err := flightsql.NewClient(
"localhost:31337",
nil,
nil,
grpc.WithInsecure(),
)
if err != nil {
panic(err)
}
defer client.Close()
// Execute a query
ctx := context.Background()
query := "SELECT * FROM customers WHERE age > 30"
info, err := client.Execute(ctx, query)
if err != nil {
panic(err)
}
// Fetch results
reader, err := client.DoGet(ctx, info.Endpoint[0].Ticket)
if err != nil {
panic(err)
}
defer reader.Release()
for reader.Next() {
record := reader.Record()
fmt.Printf("Retrieved %d rows\n", record.NumRows())
}
Prepared Statements
Prepared statements provide type safety and better performance for repeated queries:- C++
- Python
// Create prepared statement
std::string query = "SELECT * FROM orders WHERE customer_id = ? AND amount > ?";
ARROW_ASSIGN_OR_RAISE(auto prepared_statement,
sql_client->Prepare(options, query));
// Bind parameters
auto param_schema = arrow::schema({
arrow::field("customer_id", arrow::int64()),
arrow::field("amount", arrow::float64())
});
arrow::Int64Builder id_builder;
arrow::DoubleBuilder amount_builder;
ARROW_RETURN_NOT_OK(id_builder.Append(12345));
ARROW_RETURN_NOT_OK(amount_builder.Append(100.0));
ARROW_ASSIGN_OR_RAISE(auto id_array, id_builder.Finish());
ARROW_ASSIGN_OR_RAISE(auto amount_array, amount_builder.Finish());
auto param_batch = RecordBatch::Make(
param_schema, 1, {id_array, amount_array});
ARROW_RETURN_NOT_OK(prepared_statement->SetParameters(param_batch));
// Execute with parameters
ARROW_ASSIGN_OR_RAISE(auto info, prepared_statement->Execute());
// Close when done
ARROW_RETURN_NOT_OK(prepared_statement->Close());
# Create prepared statement
query = "SELECT * FROM orders WHERE customer_id = ? AND amount > ?"
prepared = sql_client.prepare(query)
# Bind parameters
import pyarrow as pa
params = pa.record_batch(
[
pa.array([12345], type=pa.int64()),
pa.array([100.0], type=pa.float64())
],
names=['customer_id', 'amount']
)
prepared.set_parameters(params)
# Execute with parameters
info = prepared.execute()
reader = sql_client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
# Close when done
prepared.close()
Metadata Operations
- C++
- Python
// List catalogs
ARROW_ASSIGN_OR_RAISE(auto catalogs_info,
sql_client->GetCatalogs(options));
ARROW_ASSIGN_OR_RAISE(auto catalogs_stream,
sql_client->DoGet(options, catalogs_info->endpoints()[0].ticket));
ARROW_ASSIGN_OR_RAISE(auto catalogs_table,
catalogs_stream->ToTable());
// List schemas
std::string catalog = "production";
std::string schema_pattern = "%";
ARROW_ASSIGN_OR_RAISE(auto schemas_info,
sql_client->GetDbSchemas(options, &catalog, &schema_pattern));
// List tables
std::string table_pattern = "customer%";
bool include_schema = true;
std::vector<std::string> table_types = {"TABLE", "VIEW"};
ARROW_ASSIGN_OR_RAISE(auto tables_info,
sql_client->GetTables(options, &catalog, &schema_pattern,
&table_pattern, include_schema, &table_types));
ARROW_ASSIGN_OR_RAISE(auto tables_stream,
sql_client->DoGet(options, tables_info->endpoints()[0].ticket));
ARROW_ASSIGN_OR_RAISE(auto tables_table,
tables_stream->ToTable());
// Get table schema
TableRef table_ref;
table_ref.catalog = "production";
table_ref.db_schema = "public";
table_ref.table = "customers";
ARROW_ASSIGN_OR_RAISE(auto primary_keys_info,
sql_client->GetPrimaryKeys(options, table_ref));
# List catalogs
catalogs_info = sql_client.get_catalogs()
reader = sql_client.do_get(catalogs_info.endpoints[0].ticket)
catalogs = reader.read_all()
print(catalogs)
# List schemas
schemas_info = sql_client.get_db_schemas(
catalog="production",
db_schema_filter_pattern="%"
)
# List tables
tables_info = sql_client.get_tables(
catalog="production",
db_schema_filter_pattern="public",
table_name_filter_pattern="customer%",
include_schema=True,
table_types=["TABLE", "VIEW"]
)
reader = sql_client.do_get(tables_info.endpoints[0].ticket)
tables = reader.read_all()
print(tables)
# Get primary keys
pk_info = sql_client.get_primary_keys(
catalog="production",
db_schema="public",
table="customers"
)
Transactions
- C++
- Python
// Begin transaction
ARROW_ASSIGN_OR_RAISE(auto transaction,
sql_client->BeginTransaction(options));
// Execute queries within transaction
std::string insert_query =
"INSERT INTO orders (customer_id, amount) VALUES (123, 500.00)";
ARROW_ASSIGN_OR_RAISE(auto rows_affected,
sql_client->ExecuteUpdate(options, insert_query, *transaction));
std::cout << "Inserted " << rows_affected << " rows" << std::endl;
// Create savepoint
ARROW_ASSIGN_OR_RAISE(auto savepoint,
sql_client->BeginSavepoint(options, *transaction, "sp1"));
// More operations...
// Commit transaction
ARROW_RETURN_NOT_OK(sql_client->Commit(options, *transaction));
// Or rollback
// ARROW_RETURN_NOT_OK(sql_client->Rollback(options, *transaction));
# Begin transaction
transaction = sql_client.begin_transaction()
# Execute queries within transaction
insert_query = "INSERT INTO orders (customer_id, amount) VALUES (123, 500.00)"
rows_affected = sql_client.execute_update(insert_query, transaction)
print(f"Inserted {rows_affected} rows")
# Create savepoint
savepoint = sql_client.begin_savepoint(transaction, "sp1")
# More operations...
# Commit transaction
sql_client.commit(transaction)
# Or rollback
# sql_client.rollback(transaction)
Bulk Ingestion
Flight SQL provides efficient bulk data loading:- C++
- Python
// Prepare data for ingestion
auto schema = arrow::schema({
arrow::field("id", arrow::int64()),
arrow::field("name", arrow::utf8()),
arrow::field("value", arrow::float64())
});
// Create record batches
std::vector<std::shared_ptr<RecordBatch>> batches;
// ... populate batches ...
ARROW_ASSIGN_OR_RAISE(auto reader,
RecordBatchReader::Make(batches, schema));
// Ingest data
TableDefinitionOptions table_def_options;
table_def_options.if_not_exists =
TableDefinitionOptions::kCreate;
std::string table = "new_data";
std::optional<std::string> db_schema = "public";
std::optional<std::string> catalog = "production";
bool temporary = false;
ARROW_ASSIGN_OR_RAISE(auto rows_ingested,
sql_client->ExecuteIngest(
options, reader, table_def_options,
table, db_schema, catalog, temporary));
std::cout << "Ingested " << rows_ingested << " rows" << std::endl;
import pyarrow as pa
# Prepare data for ingestion
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64())
])
# Create table
data = [
pa.array([1, 2, 3]),
pa.array(['Alice', 'Bob', 'Charlie']),
pa.array([10.5, 20.3, 30.1])
]
table = pa.table(data, schema=schema)
# Ingest data
rows_ingested = sql_client.execute_ingest(
table_name="new_data",
reader=table.to_reader(),
table_definition_options={
'if_not_exists': 'create'
},
schema="public",
catalog="production"
)
print(f"Ingested {rows_ingested} rows")
Update Queries
Execute INSERT, UPDATE, DELETE statements:- C++
- Python
// Execute update query
std::string update_query =
"UPDATE customers SET status = 'active' WHERE last_login > '2024-01-01'";
ARROW_ASSIGN_OR_RAISE(auto rows_affected,
sql_client->ExecuteUpdate(options, update_query));
std::cout << "Updated " << rows_affected << " rows" << std::endl;
// Delete query
std::string delete_query =
"DELETE FROM logs WHERE created_at < '2023-01-01'";
ARROW_ASSIGN_OR_RAISE(rows_affected,
sql_client->ExecuteUpdate(options, delete_query));
std::cout << "Deleted " << rows_affected << " rows" << std::endl;
# Execute update query
update_query = "UPDATE customers SET status = 'active' WHERE last_login > '2024-01-01'"
rows_affected = sql_client.execute_update(update_query)
print(f"Updated {rows_affected} rows")
# Delete query
delete_query = "DELETE FROM logs WHERE created_at < '2023-01-01'"
rows_affected = sql_client.execute_update(delete_query)
print(f"Deleted {rows_affected} rows")
Server Implementation
- C++
- Python
#include <arrow/flight/sql/server.h>
class MyFlightSqlServer : public FlightSqlServerBase {
public:
arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoStatement(
const ServerCallContext& context,
const StatementQuery& command,
const FlightDescriptor& descriptor) override {
// Parse SQL query from command.query
// Execute query planning (don't fetch data yet)
// Return FlightInfo with schema and endpoints
auto schema = arrow::schema({
arrow::field("result", arrow::int64())
});
ARROW_ASSIGN_OR_RAISE(auto location,
Location::ForGrpcTcp("localhost", 31337));
FlightEndpoint endpoint;
endpoint.ticket.ticket = "query-result-123";
endpoint.locations.push_back(location);
ARROW_ASSIGN_OR_RAISE(auto info,
FlightInfo::Make(*schema, descriptor, {endpoint}, -1, -1));
return std::unique_ptr<FlightInfo>(new FlightInfo(std::move(info)));
}
arrow::Result<std::unique_ptr<FlightDataStream>> DoGetStatement(
const ServerCallContext& context,
const StatementQueryTicket& command) override {
// Fetch actual data for the ticket
// Return stream of record batches
// In production: execute query and stream results
std::vector<std::shared_ptr<RecordBatch>> batches;
// ... populate batches from database ...
auto schema = batches[0]->schema();
ARROW_ASSIGN_OR_RAISE(auto reader,
RecordBatchReader::Make(batches, schema));
return std::unique_ptr<FlightDataStream>(
new RecordBatchStream(reader));
}
};
from pyarrow.flight import FlightServerBase
import pyarrow.flight.sql as flight_sql
class MyFlightSqlServer(flight_sql.FlightSqlServerBase):
def get_flight_info_statement(self, context, command, descriptor):
# Parse SQL query
query = command.query
# Execute query planning
schema = pa.schema([('result', pa.int64())])
# Return FlightInfo
endpoint = flight.FlightEndpoint(
ticket=b"query-result-123",
locations=[flight.Location.for_grpc_tcp("localhost", 31337)]
)
return flight.FlightInfo(
schema=schema,
descriptor=descriptor,
endpoints=[endpoint],
total_records=-1,
total_bytes=-1
)
def do_get_statement(self, context, ticket):
# Fetch actual data
# In production: execute query and stream results
schema = pa.schema([('result', pa.int64())])
data = [pa.array([1, 2, 3, 4, 5])]
batch = pa.record_batch(data, schema=schema)
return flight.RecordBatchStream(
pa.Table.from_batches([batch])
)
# Start server
server = MyFlightSqlServer(
location=flight.Location.for_grpc_tcp("0.0.0.0", 31337)
)
server.serve()
Related Topics
- Arrow Flight RPC - Underlying RPC protocol
- IPC Streaming - Data serialization format
- C Data Interface - Zero-copy integration