Skip to main content
Beam SQL allows you to use SQL queries to process data in your Apache Beam pipelines. This page demonstrates how to query PCollections using standard SQL syntax.

Basic SQL Transform

Query a PCollection using SQL syntax.
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform

# Define schema-aware data
with beam.Pipeline() as p:
    # Create PCollection with schema
    data = (
        p
        | 'Create' >> beam.Create([
            beam.Row(name='Alice', age=30, city='NYC'),
            beam.Row(name='Bob', age=25, city='SF'),
            beam.Row(name='Charlie', age=35, city='NYC'),
        ])
    )
    
    # Query with SQL
    results = (
        data
        | 'SQL Query' >> SqlTransform("""
            SELECT name, age
            FROM PCOLLECTION
            WHERE age > 25
            ORDER BY age DESC
        """)
    )
    
    results | 'Print' >> beam.Map(print)
Key Points:
  • Use beam.Row to create schema-aware elements
  • Reference the PCollection as PCOLLECTION in SQL
  • Standard SQL syntax (SELECT, WHERE, ORDER BY, etc.)

Streaming SQL with Windowing

Based on sdks/python/apache_beam/examples/sql_taxi.py:39-78
import json
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
from apache_beam.options.pipeline_options import PipelineOptions

def run_streaming_sql(output_topic, pipeline_args):
    pipeline_options = PipelineOptions(
        pipeline_args, 
        save_main_session=True, 
        streaming=True
    )
    
    with beam.Pipeline(options=pipeline_options) as p:
        results = (
            p
            # Read from PubSub
            | beam.io.ReadFromPubSub(
                topic='projects/pubsub-public-data/topics/taxirides-realtime',
                timestamp_attribute="ts"
            ).with_output_types(bytes)
            
            # Parse JSON and create schema
            | 'Parse JSON' >> beam.Map(json.loads)
            | 'Create Row' >> beam.Map(
                lambda x: beam.Row(
                    ride_status=str(x['ride_status']),
                    passenger_count=int(x['passenger_count'])
                )
            )
            
            # Apply windowing before SQL
            | '15s Windows' >> beam.WindowInto(beam.window.FixedWindows(15))
            
            # SQL aggregation within windows
            | SqlTransform("""
                SELECT
                    ride_status,
                    COUNT(*) AS num_rides,
                    SUM(passenger_count) AS total_passengers
                FROM PCOLLECTION
                WHERE NOT ride_status = 'enroute'
                GROUP BY ride_status
            """)
            
            # Access window parameters
            | 'Add Window Info' >> beam.Map(
                lambda row, window=beam.DoFn.WindowParam: {
                    'ride_status': row.ride_status,
                    'num_rides': row.num_rides,
                    'total_passengers': row.total_passengers,
                    'window_start': window.start.to_rfc3339(),
                    'window_end': window.end.to_rfc3339()
                }
            )
            
            # Write results
            | 'To JSON' >> beam.Map(json.dumps)
            | 'Encode' >> beam.Map(lambda s: s.encode('utf-8'))
            | beam.io.WriteToPubSub(topic=output_topic)
        )
Streaming SQL Features:
  • SQL computes results within existing windows
  • Use windowing before SQL transform
  • Access window metadata with beam.DoFn.WindowParam
  • Combine with streaming sources like PubSub

SQL Aggregations

Perform complex aggregations using SQL.
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform

# Sample sales data
sales = p | beam.Create([
    beam.Row(product='Laptop', category='Electronics', amount=1200, quantity=1),
    beam.Row(product='Mouse', category='Electronics', amount=25, quantity=2),
    beam.Row(product='Desk', category='Furniture', amount=300, quantity=1),
    beam.Row(product='Chair', category='Furniture', amount=150, quantity=4),
    beam.Row(product='Keyboard', category='Electronics', amount=75, quantity=1),
])

# Aggregate by category
results = sales | SqlTransform("""
    SELECT
        category,
        COUNT(*) as product_count,
        SUM(amount * quantity) as total_revenue,
        AVG(amount) as avg_price,
        MAX(amount) as max_price,
        MIN(amount) as min_price
    FROM PCOLLECTION
    GROUP BY category
    ORDER BY total_revenue DESC
""")
Supported Aggregations:
  • COUNT(*), COUNT(column)
  • SUM(column), AVG(column)
  • MIN(column), MAX(column)
  • STDDEV(column), VAR(column)

Joining PCollections

Join multiple PCollections using SQL.
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform

# Orders PCollection
orders = p | 'Orders' >> beam.Create([
    beam.Row(order_id=1, customer_id=101, amount=250),
    beam.Row(order_id=2, customer_id=102, amount=150),
    beam.Row(order_id=3, customer_id=101, amount=300),
])

# Customers PCollection
customers = p | 'Customers' >> beam.Create([
    beam.Row(customer_id=101, name='Alice', tier='Gold'),
    beam.Row(customer_id=102, name='Bob', tier='Silver'),
])

# Join using SQL
results = (
    {'orders': orders, 'customers': customers}
    | SqlTransform("""
        SELECT
            c.name,
            c.tier,
            COUNT(*) as order_count,
            SUM(o.amount) as total_spent
        FROM orders o
        INNER JOIN customers c
            ON o.customer_id = c.customer_id
        GROUP BY c.name, c.tier
    """)
)

Data Catalog Integration

Query external tables using Google Cloud Data Catalog. Based on sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java:61-83
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogPipelineOptions;

public class DataCatalogExample {
    public static void main(String[] args) {
        DataCatalogPipelineOptions options = 
            PipelineOptionsFactory.fromArgs(args).as(DataCatalogPipelineOptions.class);
        
        Pipeline pipeline = Pipeline.create(options);
        
        // Use Data Catalog as table provider
        try (DataCatalogTableProvider tableProvider = 
                DataCatalogTableProvider.create(options)) {
            
            pipeline
                .apply("SQL Query",
                    SqlTransform.query(options.getQueryString())
                        .withDefaultTableProvider("datacatalog", tableProvider))
                .apply("Convert to Strings", rowsToStrings())
                .apply("Write output", TextIO.write().to(options.getOutputFilePrefix()));
            
            pipeline.run().waitUntilFinish();
        }
    }
}
Example Query:
SELECT
    product_name,
    SUM(quantity) as total_quantity,
    AVG(price) as avg_price
FROM `project.dataset.sales_table`
WHERE date >= '2024-01-01'
GROUP BY product_name
ORDER BY total_quantity DESC
LIMIT 10

Filtering and Transformations

Use SQL for complex filtering and data transformations.
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform

# User events data
events = p | beam.Create([
    beam.Row(user_id=1, event='login', timestamp=1640000000, value=None),
    beam.Row(user_id=1, event='purchase', timestamp=1640000100, value=99.99),
    beam.Row(user_id=2, event='login', timestamp=1640000050, value=None),
    beam.Row(user_id=2, event='view', timestamp=1640000150, value=None),
    beam.Row(user_id=1, event='logout', timestamp=1640000200, value=None),
])

# Complex filtering and transformations
results = events | SqlTransform("""
    SELECT
        user_id,
        event,
        CAST(timestamp AS TIMESTAMP) as event_time,
        COALESCE(value, 0.0) as event_value,
        CASE
            WHEN event = 'purchase' THEN 'conversion'
            WHEN event = 'view' THEN 'engagement'
            ELSE 'other'
        END as event_category
    FROM PCOLLECTION
    WHERE event IN ('login', 'purchase', 'view')
        AND (value IS NULL OR value > 0)
""")
SQL Features:
  • CASE statements for conditional logic
  • COALESCE for null handling
  • CAST for type conversions
  • IN, LIKE for filtering
  • Date/time functions

Window Functions

Use SQL window functions for advanced analytics.
from apache_beam.transforms.sql import SqlTransform

# Calculate running totals and rankings
results = sales | SqlTransform("""
    SELECT
        product,
        category,
        amount,
        SUM(amount) OVER (
            PARTITION BY category 
            ORDER BY amount DESC
        ) as running_total,
        ROW_NUMBER() OVER (
            PARTITION BY category 
            ORDER BY amount DESC
        ) as rank_in_category,
        AVG(amount) OVER (
            PARTITION BY category
        ) as category_avg
    FROM PCOLLECTION
""")
Window Functions:
  • ROW_NUMBER(), RANK(), DENSE_RANK()
  • LAG(), LEAD()
  • Aggregate functions with OVER clause
  • PARTITION BY and ORDER BY

Creating Schema-Aware PCollections

Different ways to create PCollections with schemas.
import apache_beam as beam
from apache_beam import coders

# Method 1: Using beam.Row
data1 = p | beam.Create([
    beam.Row(name='Alice', age=30),
    beam.Row(name='Bob', age=25),
])

# Method 2: From dictionaries with schema inference
data2 = (
    p
    | beam.Create([
        {'name': 'Alice', 'age': 30},
        {'name': 'Bob', 'age': 25},
    ])
    | beam.Map(lambda d: beam.Row(**d))
)

# Method 3: Using RowCoder with explicit schema
from apache_beam.typehints.schemas import named_tuple_to_schema
from typing import NamedTuple

class Person(NamedTuple):
    name: str
    age: int

data3 = (
    p
    | beam.Create([Person('Alice', 30), Person('Bob', 25)])
    | beam.Map(lambda p: beam.Row(name=p.name, age=p.age))
)

Best Practices

Use Schemas

  • Define clear schemas for your data
  • Use beam.Row for schema-aware elements
  • Leverage type inference when possible

Optimize Queries

  • Filter early to reduce data volume
  • Use appropriate indexes in external sources
  • Consider query complexity vs. native transforms

Window Before SQL

  • Apply windowing before SQL transforms
  • SQL operates within window boundaries
  • Access window metadata in post-processing

Test SQL Queries

  • Validate SQL syntax before running
  • Test with sample data locally
  • Monitor query performance

SQL Dialect Support

Beam SQL supports multiple SQL dialects:
  • Calcite SQL (default): Standard SQL with extensions
  • ZetaSQL: Google Standard SQL dialect
# Specify ZetaSQL dialect
from apache_beam.transforms.sql import SqlTransform

results = data | SqlTransform(
    query="""SELECT name, age FROM PCOLLECTION""",
    dialect="zetasql"
)

Beam SQL Overview

Complete SQL reference and capabilities

Schema Guide

Working with schemas in Beam

Joins

Alternative joining methods

Data Catalog

Using external table metadata

Build docs developers (and LLMs) love