Skip to main content
The JSON format reads and writes JSON-encoded data. The JSON schema is automatically derived from the Flink table schema. It supports both append-only streams and, when combined with a connector such as Upsert Kafka, retract and upsert streams. JSON is a serialization schema (for sinks) and a deserialization schema (for sources).

Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-json</artifactId>
  <version>${flink.version}</version>
</dependency>

Usage with Kafka

CREATE TABLE user_behavior (
  user_id     BIGINT,
  item_id     BIGINT,
  category_id BIGINT,
  behavior    STRING,
  ts          TIMESTAMP(3)
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id'          = 'json-consumer',
  'format'                       = 'json',
  'json.fail-on-missing-field'   = 'false',
  'json.ignore-parse-errors'     = 'true'
)

Usage with Filesystem

When using JSON with the Filesystem connector, each line in the file must be a complete, valid JSON object (newline-delimited JSON / JSONL). Standard multi-line JSON arrays are not supported for filesystem writes.
CREATE TABLE events_log (
  event_id STRING,
  event_ts TIMESTAMP(3),
  payload  ROW<action STRING, value DOUBLE>,
  dt       STRING
) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',
  'path'      = '/data/events/',
  'format'    = 'json'
)

Nested types

Map nested JSON objects to Flink ROW types:
CREATE TABLE orders (
  order_id  BIGINT,
  customer  ROW<
    id    BIGINT,
    name  STRING,
    email STRING
  >,
  items     ARRAY<ROW<product_id INT, quantity INT, price DECIMAL(10,2)>>,
  metadata  MAP<STRING, STRING>
) WITH (
  'connector' = 'kafka',
  'topic'     = 'orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'    = 'json'
)
Access nested fields in queries:
SELECT order_id, customer.name, customer.email
FROM orders
WHERE customer.id > 1000;

Top-level JSON arrays

Flink’s JSON format supports reading top-level JSON arrays. The following two inputs both produce two rows (123, "a") and (456, "b") for a table with columns (col1 BIGINT, col2 VARCHAR):
[{"col1": 123, "col2": "a"}, {"col1": 456, "col2": "b"}]
{"col1": 123, "col2": "a"}
{"col1": 456, "col2": "b"}

Format options

OptionRequiredDefaultDescription
formatYesMust be 'json'.
json.fail-on-missing-fieldNofalseThrow an error if an expected field is missing from the input JSON.
json.ignore-parse-errorsNofalseSkip rows that fail to parse instead of failing the job. Affected fields are set to null.
json.timestamp-format.standardNo'SQL'Timestamp format. 'SQL': yyyy-MM-dd HH:mm:ss.s{precision}. 'ISO-8601': yyyy-MM-ddTHH:mm:ss.s{precision}.
json.map-null-key.modeNo'FAIL'How to handle null map keys during serialization. 'FAIL': throw exception. 'DROP': skip the entry. 'LITERAL': replace with the string specified in json.map-null-key.literal.
json.map-null-key.literalNo'null'Replacement string when json.map-null-key.mode = 'LITERAL'.
json.encode.decimal-as-plain-numberNofalseWrite decimals as plain numbers instead of scientific notation (e.g., 0.000000027 instead of 2.7E-8).
json.encode.ignore-null-fieldsNofalseOmit null fields from the serialized JSON output.
decode.json-parser.enabledNotrueUse the Jackson JsonParser (streaming) API for parsing. Faster and uses less memory than the JsonNode approach. Disable if you encounter compatibility issues.

Data type mapping

Flink SQL typeJSON type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring (base64-encoded)
DECIMALnumber
TINYINT / SMALLINT / INT / BIGINTnumber
FLOAT / DOUBLEnumber
DATEstring with format date
TIMEstring with format time
TIMESTAMPstring with format date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format date-time (UTC)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject

Timestamp format examples

-- Input / output: "2024-01-15 12:30:00.123"
CREATE TABLE events (
  id INT,
  ts TIMESTAMP(3)
) WITH (
  'connector'                    = 'kafka',
  'topic'                        = 'events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'                       = 'json'
  -- json.timestamp-format.standard defaults to 'SQL'
)

Build docs developers (and LLMs) love