The Table API is a language-integrated relational API for Java, Scala, and Python. Instead of writing SQL strings, you compose queries from type-safe operator calls with IDE support for autocompletion and compile-time validation. Table API queries run identically on streaming and batch inputs.
The Table API and SQL share the same planner, so you can freely mix both styles in a single program.
Getting started
Assume a registered Orders table with schema (a STRING, b BIGINT, c BIGINT, rowtime TIMESTAMP(3)).
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.*;
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
// scan a registered table
Table orders = tEnv.from("Orders");
// group and count
Table counts = orders
.groupBy($("a"))
.select($("a"), $("b").count().as("cnt"));
counts.execute().print();
Scan, projection, and filter
from / from_path
Scans a registered table. Equivalent to SQL FROM.
Table orders = tableEnv.from("Orders");
fromValues
Creates an inline table from literal values. Equivalent to SQL VALUES.
Table table = tEnv.fromValues(
row(1, "Alice"),
row(2, "Bob")
);
// with explicit schema
Table typed = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING())
),
row(1, "Alice"),
row(2, "Bob")
);
Projects columns, renames them, or computes expressions. Equivalent to SQL SELECT.
Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));
// wildcard — select all columns
Table all = orders.select($("*"));
where / filter
Filters rows that do not satisfy the predicate. Equivalent to SQL WHERE.
Table orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));
// or using filter()
Table result2 = orders.filter($("b").isGreater(100));
Column operations
Table orders = tableEnv.from("Orders");
// add a new column
Table withDesc = orders.addColumns(concat($("c"), " (confirmed)").as("desc"));
// add or overwrite
Table updated = orders.addOrReplaceColumns(concat($("c"), " (v2)").as("c"));
// drop columns
Table dropped = orders.dropColumns($("b"), $("c"));
// rename columns
Table renamed = orders.renameColumns($("b").as("amount"), $("c").as("category"));
Aggregations
GroupBy aggregation
Table orders = tableEnv.from("Orders");
Table result = orders
.groupBy($("a"))
.select($("a"), $("b").sum().as("total"));
GroupBy aggregation on a streaming table produces an updating result. You need a sink that supports upserts (such as JDBC or the upsert-Kafka connector) to write the result.
GroupBy window aggregation
Window aggregations compute results over finite time windows. Use Tumble, Slide, or Session window types:
import org.apache.flink.table.api.Tumble;
import static org.apache.flink.table.api.Expressions.*;
Table orders = tableEnv.from("Orders");
Table result = orders
.window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w"))
.groupBy($("a"), $("w"))
.select(
$("a"),
$("w").start(),
$("w").end(),
$("b").sum().as("total")
);
Over window aggregation
Over windows compute aggregates for each row over a range of surrounding rows. Equivalent to SQL OVER.
import org.apache.flink.table.api.Over;
import static org.apache.flink.table.api.Expressions.*;
Table orders = tableEnv.from("Orders");
Table result = orders
.window(
Over
.partitionBy($("a"))
.orderBy($("rowtime"))
.preceding(UNBOUNDED_RANGE)
.following(CURRENT_RANGE)
.as("w")
)
.select(
$("a"),
$("b").avg().over($("w")).as("running_avg"),
$("b").max().over($("w")).as("running_max")
);
Distinct aggregation
Table orders = tableEnv.from("Orders");
Table result = orders
.groupBy($("a"))
.select($("a"), $("b").sum().distinct().as("distinct_sum"));
Inner join
Table left = tableEnv.from("Orders").select($("a"), $("b"));
Table right = tableEnv.from("Customers").select($("d"), $("e"));
Table result = left.join(right)
.where($("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Outer join
Table leftOuter = left.leftOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table rightOuter = left.rightOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Table fullOuter = left.fullOuterJoin(right, $("a").isEqual($("d")))
.select($("a"), $("b"), $("e"));
Interval join
Interval joins bound streaming joins by a time condition, allowing Flink to clear state for rows that can no longer match:
Table left = tableEnv.from("Orders").select($("a"), $("b"), $("ltime"));
Table right = tableEnv.from("Shipments").select($("d"), $("e"), $("rtime"));
Table result = left.join(right)
.where(
and(
$("a").isEqual($("d")),
$("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())),
$("ltime").isLess($("rtime").plus(lit(10).minutes()))
)
)
.select($("a"), $("b"), $("e"), $("ltime"));
Join with a table function (lateral join)
Join each row with the output of a table function:
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);
Table orders = tableEnv.from("Orders");
Table result = orders
.joinLateral(call("split", $("c")).as("word", "len", "score"))
.select($("a"), $("b"), $("word"), $("len"));
// preserve unmatched outer rows
Table result2 = orders
.leftOuterJoinLateral(call("split", $("c")).as("word", "len", "score"))
.select($("a"), $("b"), $("word"), $("len"));
Set operations
Table left = tableEnv.from("orders_eu");
Table right = tableEnv.from("orders_us");
// remove duplicates
Table union = left.union(right);
// keep duplicates
Table unionAll = left.unionAll(right);
// rows present in both (batch only)
Table intersect = left.intersect(right);
// rows in left but not right (batch only)
Table minus = left.minus(right);
// filter by membership in a subquery
Table result = left.select($("a"), $("b"))
.where($("a").in(right));
Ordering and limiting
// sort ascending
Table sorted = orders.orderBy($("b").asc());
// first 5 rows
Table top5 = orders.orderBy($("b").desc()).fetch(5);
// skip 10, return the next 5
Table paged = orders.orderBy($("b").asc()).offset(10).fetch(5);
orderBy on an unbounded streaming table requires either a time attribute column or a subsequent fetch to bound the result.
Writing results
Table orders = tableEnv.from("Orders");
// insert into a registered sink table
orders.insertInto("OrderArchive").execute();
// equivalently in Python
# orders.execute_insert("OrderArchive")
Complex example: filter, normalize, window average
Table orders = tEnv.from("Orders");
Table result = orders
.filter(
and(
$("a").isNotNull(),
$("b").isNotNull(),
$("c").isNotNull()
)
)
.select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
.window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
.groupBy($("hourlyWindow"), $("a"))
.select(
$("a"),
$("hourlyWindow").end().as("hour"),
$("b").avg().as("avgBillingAmount")
);