Skip to main content
Arrow Flight is a general-purpose client-server framework for high-performance transport of large datasets over network interfaces. Built on top of gRPC and the Arrow columnar format, Flight enables efficient data transfer with minimal serialization overhead.

Overview

Flight provides a set of RPC methods for:
  • Data retrieval: Download datasets from a server (DoGet)
  • Data upload: Upload datasets to a server (DoPut)
  • Bidirectional streaming: Exchange data in both directions (DoExchange)
  • Metadata operations: List available datasets, get schemas, execute actions
  • Authentication: Secure connections with multiple auth mechanisms

Core Concepts

FlightDescriptor

A FlightDescriptor identifies a dataset or command:
message FlightDescriptor {
  enum DescriptorType {
    PATH = 1;  // Named dataset path
    CMD = 2;   // Opaque command
  }
  DescriptorType type = 1;
  bytes cmd = 2;
  repeated string path = 3;
}

FlightInfo

FlightInfo describes how to access a dataset, including endpoints and schema:
message FlightInfo {
  bytes schema = 1;
  FlightDescriptor flight_descriptor = 2;
  repeated FlightEndpoint endpoint = 3;
  int64 total_records = 4;
  int64 total_bytes = 5;
}

FlightEndpoint

FlightEndpoint specifies where to retrieve data:
message FlightEndpoint {
  Ticket ticket = 1;
  repeated Location location = 2;
  google.protobuf.Timestamp expiration_time = 3;
}

Client Usage

#include <arrow/flight/client.h>
#include <arrow/flight/types.h>

using namespace arrow;
using namespace arrow::flight;

// Connect to Flight server
ARROW_ASSIGN_OR_RAISE(auto location, 
    Location::ForGrpcTcp("localhost", 8815));
ARROW_ASSIGN_OR_RAISE(auto client, 
    FlightClient::Connect(location));

// Get dataset information
FlightDescriptor descriptor;
descriptor.type = FlightDescriptor::PATH;
descriptor.path = {"dataset", "table1"};

ARROW_ASSIGN_OR_RAISE(auto info, 
    client->GetFlightInfo(descriptor));

// Retrieve data from the first endpoint
auto endpoint = info->endpoints()[0];
ARROW_ASSIGN_OR_RAISE(auto stream, 
    client->DoGet(endpoint.ticket));

// Read all batches
ARROW_ASSIGN_OR_RAISE(auto table, 
    stream->ToTable());

std::cout << "Rows: " << table->num_rows() << std::endl;

Server Implementation

#include <arrow/flight/server.h>
#include <arrow/flight/types.h>
#include <arrow/table.h>

class MyFlightServer : public FlightServerBase {
 public:
  Status GetFlightInfo(const ServerCallContext& context,
                      const FlightDescriptor& request,
                      std::unique_ptr<FlightInfo>* info) override {
    // Build schema for the dataset
    auto schema = arrow::schema({
        arrow::field("id", arrow::int64()),
        arrow::field("name", arrow::utf8())
    });
    
    // Create location and ticket
    ARROW_ASSIGN_OR_RAISE(auto location, 
        Location::ForGrpcTcp("localhost", 8815));
    
    FlightEndpoint endpoint;
    endpoint.ticket.ticket = "ticket-123";
    endpoint.locations.push_back(location);
    
    // Build FlightInfo
    ARROW_ASSIGN_OR_RAISE(*info,
        FlightInfo::Make(*schema, request, {endpoint}, -1, -1));
    
    return Status::OK();
  }
  
  Status DoGet(const ServerCallContext& context,
              const Ticket& request,
              std::unique_ptr<FlightDataStream>* stream) override {
    // Create sample data
    auto schema = arrow::schema({
        arrow::field("id", arrow::int64()),
        arrow::field("name", arrow::utf8())
    });
    
    // Build record batch (simplified)
    // In production, fetch from database or storage
    std::shared_ptr<RecordBatch> batch;
    // ... create batch ...
    
    // Create reader from batches
    std::vector<std::shared_ptr<RecordBatch>> batches = {batch};
    ARROW_ASSIGN_OR_RAISE(auto reader,
        RecordBatchReader::Make(batches, schema));
    
    *stream = std::make_unique<RecordBatchStream>(reader);
    return Status::OK();
  }
};

// Start server
int main() {
  ARROW_ASSIGN_OR_RAISE(auto location, 
      Location::ForGrpcTcp("0.0.0.0", 8815));
  
  FlightServerOptions options(location);
  auto server = std::make_unique<MyFlightServer>();
  
  ARROW_RETURN_NOT_OK(server->Init(options));
  std::cout << "Server listening on port " 
            << server->port() << std::endl;
  
  return server->Serve();
}

Uploading Data (DoPut)

// Client: Upload data to server
FlightDescriptor descriptor;
descriptor.type = FlightDescriptor::CMD;
descriptor.cmd = "insert_data";

auto schema = arrow::schema({
    arrow::field("id", arrow::int64()),
    arrow::field("value", arrow::float64())
});

ARROW_ASSIGN_OR_RAISE(auto do_put_result,
    client->DoPut(descriptor, schema));

// Write batches
std::shared_ptr<RecordBatch> batch;
// ... create batch ...

ARROW_RETURN_NOT_OK(do_put_result.writer->WriteRecordBatch(*batch));
ARROW_RETURN_NOT_OK(do_put_result.writer->DoneWriting());
ARROW_RETURN_NOT_OK(do_put_result.writer->Close());

// Read server response metadata
std::shared_ptr<Buffer> metadata;
ARROW_RETURN_NOT_OK(do_put_result.reader->ReadMetadata(&metadata));

Authentication

// Basic authentication
FlightCallOptions options;
std::string username = "user";
std::string password = "password";

ARROW_ASSIGN_OR_RAISE(auto result,
    client->AuthenticateBasicToken(options, username, password));

std::string bearer_token = result.first;

// Use token in subsequent calls
FlightCallOptions authenticated_options;
authenticated_options.headers.push_back(
    {"authorization", "Bearer " + bearer_token});

ARROW_ASSIGN_OR_RAISE(auto info,
    client->GetFlightInfo(authenticated_options, descriptor));

Actions

Actions allow custom operations beyond standard data transfer:
// List available actions
ARROW_ASSIGN_OR_RAISE(auto actions, client->ListActions());

for (const auto& action : actions) {
  std::cout << "Action: " << action.type 
            << " - " << action.description << std::endl;
}

// Execute a custom action
Action action;
action.type = "drop_dataset";
action.body = Buffer::FromString("{\"name\": \"old_data\"}");

ARROW_ASSIGN_OR_RAISE(auto results, client->DoAction(action));

// Process results
std::unique_ptr<Result> result;
while (true) {
  ARROW_ASSIGN_OR_RAISE(result, results->Next());
  if (!result) break;
  // Process result.body
}

Performance Best Practices

  1. Batch Size: Use appropriate batch sizes (typically 64K-1M rows)
  2. Parallel Endpoints: Leverage multiple endpoints for parallel data transfer
  3. Compression: Enable IPC compression for network-bound workloads
  4. Connection Pooling: Reuse client connections when possible
  5. Streaming: Use DoExchange for bidirectional streaming workloads

Build docs developers (and LLMs) love