Skip to main content
Flink SQL lets you query streaming data using standard ANSI SQL 2011 syntax. If you have used any SQL database, you can start writing Flink queries immediately — no Java or Python required. The key difference from a traditional database is that Flink SQL queries run continuously: they process new rows as they arrive and update their results in real time. A query against an unbounded stream never terminates on its own.

Prerequisites

  • Basic knowledge of SQL (SELECT, GROUP BY, CREATE TABLE)
  • A running Flink cluster — follow the Local Installation guide first
The Docker option in Local Installation is the easiest path to a working SQL Client, since the SQL Client service is included in the docker-compose.yml.

Steps

1

Start the SQL Client

The SQL Client is an interactive CLI that submits SQL statements to your Flink cluster and displays results.
docker compose run sql-client
You should see the Flink SQL Client banner and a prompt:
                                   ▒▒▓▓▓▓▓▓▓▓▓▓▓▓▓▒▒
                               ▒▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▒
                            ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓
...
Flink SQL>
To exit at any time, type exit; and press Enter.
2

Run your first query

Start with a simple expression to confirm the SQL Client is working:
SELECT 'Hello World';
The result is displayed in a result table in the terminal:
+-------------+
|      EXPR$0 |
+-------------+
| Hello World |
+-------------+
1 row in set
You can also query the current timestamp:
SELECT CURRENT_TIMESTAMP;
To see all available built-in functions:
SHOW FUNCTIONS;
3

Create a source table

Flink does not store data locally. Instead, queries operate on top of external tables backed by connectors. Source tables produce rows from an external system (Kafka, filesystem, DataGen, etc.).Create a source table using the DataGen connector, which generates random rows automatically — no external system required:
CREATE TABLE employee_information (
    emp_id   INT,
    name     STRING,
    dept_id  INT
) WITH (
    'connector'              = 'datagen',
    'rows-per-second'        = '10',
    'fields.emp_id.kind'     = 'sequence',
    'fields.emp_id.start'    = '1',
    'fields.emp_id.end'      = '1000000',
    'fields.name.length'     = '8',
    'fields.dept_id.min'     = '1',
    'fields.dept_id.max'     = '5'
);
This creates a table that produces 10 new employee records per second with:
  • emp_id: sequential integers from 1 to 1,000,000
  • name: random 8-character strings
  • dept_id: random integers between 1 and 5
Confirm the table was created:
SHOW TABLES;
4

Query the streaming source

Run a simple continuous query to see rows as they are generated:
SELECT * FROM employee_information WHERE dept_id = 1;
The result table updates in real time as new rows arrive. Press Q to exit the result view and return to the SQL prompt.
This query runs continuously and will keep producing results until you press Q. It does not terminate on its own because employee_information is an unbounded stream.
5

Write a continuous aggregation

Compute a running count of employees per department. This query maintains state across all incoming rows and continuously updates the result:
SELECT
    dept_id,
    COUNT(*) AS emp_count
FROM employee_information
GROUP BY dept_id;
As new rows arrive, the emp_count values update. You will see +I (insert) and -U/+U (retract and upsert) row kinds in changelog mode, reflecting how the aggregation result changes over time.Press Q to exit the result view.
6

Create a sink table and persist results

The interactive result view is read-only. To persist query results for reporting or downstream consumption, write them to a sink table using INSERT INTO.First, create the sink table. This example uses the print connector, which writes each result row to the TaskManager’s log — suitable for development and testing:
CREATE TABLE department_counts (
    dept_id    INT,
    emp_count  BIGINT
) WITH (
    'connector' = 'print'
);
Now submit the aggregation as a persistent streaming job:
INSERT INTO department_counts
SELECT
    dept_id,
    COUNT(*) AS emp_count
FROM employee_information
GROUP BY dept_id;
Flink responds with a job ID and runs the query as a background job:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: a3f79a59e8e1e2f3b4c5d6e7f8091011
To see the output rows written by the print connector:
docker compose logs -f taskmanager
You should see output like:
+I[1, 1]
+I[3, 1]
+I[2, 1]
-U[1, 1]
+U[1, 2]
-U[3, 1]
+U[3, 2]
Each line is one changelog record written by the print connector. +I means a new row was inserted, -U is a retract of the previous value, and +U is the updated value.
7

Use a production-grade sink connector

The print connector is only for development. In production you would write results to an external system. Here are examples for common sinks:
CREATE TABLE department_counts_kafka (
    dept_id    INT,
    emp_count  BIGINT
) WITH (
    'connector'         = 'kafka',
    'topic'             = 'department-counts',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format'            = 'json'
);
Kafka, JDBC, and filesystem connectors require the corresponding JAR files to be present in Flink’s lib/ directory. Download the connector JARs from the Flink downloads page or add them to your Docker image.
8

Manage jobs

Use SHOW JOBS to list all running and completed jobs:
SHOW JOBS;
Output:
+----------------------------------+---------+---------+----------------------------+
|                           job id | job name |  status |                 start time |
+----------------------------------+---------+---------+----------------------------+
| a3f79a59e8e1e2f3b4c5d6e7f8091011 | default |  RUNNING| 2024-01-01T09:00:00.000000 |
+----------------------------------+---------+---------+----------------------------+
Stop a running job:
STOP JOB 'a3f79a59e8e1e2f3b4c5d6e7f8091011';

Key concepts recap

ConceptDescription
Source tableReads from an external system (Kafka, filesystem, DataGen). Produces an unbounded or bounded stream of rows.
Sink tableWrites results to an external system. Referenced in INSERT INTO.
Continuous queryA SQL query that runs indefinitely, processing new rows as they arrive.
Dynamic tableFlink’s internal abstraction for a table whose content changes over time. Every streaming source is a dynamic table.
ChangelogThe stream of row insertions, updates, and deletions that represents changes to a dynamic table.
WatermarkA timestamp marker that tells Flink how far event time has progressed, enabling late data handling and window closing.

Common DDL patterns

Create a Kafka source with JSON format and event-time watermarks

CREATE TABLE orders (
    order_id   BIGINT,
    customer   STRING,
    amount     DECIMAL(10, 2),
    order_ts   TIMESTAMP(3),
    WATERMARK FOR order_ts AS order_ts - INTERVAL '5' SECOND
) WITH (
    'connector'                        = 'kafka',
    'topic'                            = 'orders',
    'properties.bootstrap.servers'     = 'localhost:9092',
    'properties.group.id'              = 'flink-sql-group',
    'scan.startup.mode'                = 'earliest-offset',
    'format'                           = 'json',
    'json.timestamp-format.standard'   = 'ISO-8601'
);

Tumbling window aggregation

SELECT
    TUMBLE_START(order_ts, INTERVAL '1' HOUR)  AS window_start,
    customer,
    SUM(amount)                                AS total_spend,
    COUNT(*)                                   AS order_count
FROM orders
GROUP BY TUMBLE(order_ts, INTERVAL '1' HOUR), customer;

Sliding window aggregation

SELECT
    HOP_START(order_ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)  AS window_start,
    HOP_END(order_ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)    AS window_end,
    customer,
    SUM(amount) AS total_spend
FROM orders
GROUP BY HOP(order_ts, INTERVAL '5' MINUTE, INTERVAL '1' HOUR), customer;

Join two streams

SELECT
    o.order_id,
    o.customer,
    c.email
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.order_ts AS c
    ON o.customer = c.name;

What’s next

  • SQL reference — Complete DDL and DML syntax, all supported operators, and configuration options in the SQL reference documentation.
  • Built-in functions — String, math, date/time, aggregation, and table-generating functions in the built-in functions reference.
  • Connectors — Connect to Kafka, JDBC, Elasticsearch, Hive, and more in the connectors documentation.
  • Dynamic tables — Understand how Flink maps streams to tables in dynamic tables concepts.
  • Table API Quickstart — Build the same kind of aggregation programmatically using the Table API.

Build docs developers (and LLMs) love