The SQL Client provides an interactive command-line interface for writing and executing SQL queries against a Flink cluster. You can run streaming and batch SQL queries, manage jobs, and inspect metadata — all without writing any Java or Scala code.
Starting the SQL Client
The SQL Client is bundled with the Flink distribution. Before starting, make sure you have a running Flink cluster:
Embedded mode (default)
Gateway mode
Start the SQL Client in embedded mode, where it runs as a local process:Or explicitly:./bin/sql-client.sh embedded
Connect to a remote SQL Gateway instead of embedding a local session:./bin/sql-client.sh gateway --endpoint <gateway-host>:<port>
You can also specify a full URL and provide custom HTTP headers:export FLINK_REST_CLIENT_HEADERS="Cookie:myauthcookie=foobar"
./bin/sql-client.sh gateway --endpoint https://your-sql-gateway.example.com/sql
Running SQL queries
Once inside the SQL Client shell, you can execute SQL statements directly:
-- Set result display mode
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch';
-- Run a simple aggregation
SELECT
name,
COUNT(*) AS cnt
FROM
(VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
GROUP BY name;
Result modes
The SQL Client supports three result display modes:
| Mode | Description |
|---|
tableau | Shows results as a formatted table (default for batch queries) |
changelog | Shows a changelog stream with +I, -U, +U, -D markers |
table | Interactive table view with refresh for streaming queries |
SET 'sql-client.execution.result-mode' = 'changelog';
Creating tables and views
Define source and sink tables using DDL:
-- Create a source table using the DataGen connector
CREATE TABLE orders (
order_id BIGINT,
item STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5'
);
-- Create a print sink for debugging
CREATE TABLE print_sink (
item STRING,
total DECIMAL(10, 2)
) WITH (
'connector' = 'print'
);
-- Insert streaming aggregation results
INSERT INTO print_sink
SELECT item, SUM(amount) AS total
FROM orders
GROUP BY item;
Running SQL scripts
Run a SQL script file in batch mode — useful for CI pipelines and automation:
./bin/sql-client.sh -f path/to/my-script.sql
The script can contain multiple statements separated by semicolons:
SET 'execution.runtime-mode' = 'streaming';
SET 'parallelism.default' = '4';
CREATE TABLE kafka_source (
user_id BIGINT,
page STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-client',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE pageview_counts (
page STRING,
view_count BIGINT,
window_end TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
INSERT INTO pageview_counts
SELECT
page,
COUNT(*) AS view_count,
TUMBLE_END(ts, INTERVAL '1' MINUTE) AS window_end
FROM kafka_source
GROUP BY page, TUMBLE(ts, INTERVAL '1' MINUTE);
Managing jobs
Use SQL statements to manage running jobs:
-- List all running jobs
SHOW JOBS;
-- Stop a job with a savepoint
STOP JOB '<job-id>' WITH SAVEPOINT;
-- Cancel a job immediately
STOP JOB '<job-id>';
Browse catalogs, databases, and tables:
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
-- Describe a table schema
DESCRIBE orders;
-- Show the DDL used to create a table
SHOW CREATE TABLE orders;
Configuration
Set configuration options at the beginning of your SQL session or script:
-- Checkpointing
SET 'execution.checkpointing.interval' = '30s';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- Parallelism
SET 'parallelism.default' = '8';
-- State backend
SET 'state.backend' = 'rocksdb';
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
You can also pass initial configuration via the -D flag when starting the SQL Client:
./bin/sql-client.sh -Dparallelism.default=4 -Dexecution.checkpointing.interval=30s
SQL Client startup options
| Option | Description |
|---|
-f <file> | Execute a SQL script file and exit |
-i <file> | Initialize environment with a SQL file before entering interactive mode |
-D <key>=<value> | Set a configuration property |
-e <endpoint> (embedded) | Comma-separated list of jar files to add to the class path |
--endpoint <address> (gateway) | Address of the SQL Gateway to connect to |
--help | Print help message |
The SQL Client interactive shell supports tab completion for SQL keywords, table names, and column names. Press Tab to autocomplete.
Use ./bin/sql-client.sh -i init.sql -f job.sql to run an initialization script before executing a job script. This lets you keep DDL statements (CREATE TABLE, SET configuration) separate from DML statements (INSERT INTO).