Skip to main content
The Table API is a language-integrated relational API for Java, Scala, and Python. Instead of writing SQL strings, you compose queries from type-safe operator calls with IDE support for autocompletion and compile-time validation. Table API queries run identically on streaming and batch inputs. The Table API and SQL share the same planner, so you can freely mix both styles in a single program.

Getting started

Assume a registered Orders table with schema (a STRING, b BIGINT, c BIGINT, rowtime TIMESTAMP(3)).
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.*;

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

// scan a registered table
Table orders = tEnv.from("Orders");

// group and count
Table counts = orders
    .groupBy($("a"))
    .select($("a"), $("b").count().as("cnt"));

counts.execute().print();

Scan, projection, and filter

from / from_path

Scans a registered table. Equivalent to SQL FROM.
Table orders = tableEnv.from("Orders");

fromValues

Creates an inline table from literal values. Equivalent to SQL VALUES.
Table table = tEnv.fromValues(
    row(1, "Alice"),
    row(2, "Bob")
);

// with explicit schema
Table typed = tEnv.fromValues(
    DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("name", DataTypes.STRING())
    ),
    row(1, "Alice"),
    row(2, "Bob")
);

select

Projects columns, renames them, or computes expressions. Equivalent to SQL SELECT.
Table orders = tableEnv.from("Orders");
Table result = orders.select($("a"), $("c").as("d"));

// wildcard — select all columns
Table all = orders.select($("*"));

where / filter

Filters rows that do not satisfy the predicate. Equivalent to SQL WHERE.
Table orders = tableEnv.from("Orders");
Table result = orders.where($("b").isEqual("red"));

// or using filter()
Table result2 = orders.filter($("b").isGreater(100));

Column operations

Table orders = tableEnv.from("Orders");

// add a new column
Table withDesc = orders.addColumns(concat($("c"), " (confirmed)").as("desc"));

// add or overwrite
Table updated = orders.addOrReplaceColumns(concat($("c"), " (v2)").as("c"));

// drop columns
Table dropped = orders.dropColumns($("b"), $("c"));

// rename columns
Table renamed = orders.renameColumns($("b").as("amount"), $("c").as("category"));

Aggregations

GroupBy aggregation

Table orders = tableEnv.from("Orders");
Table result = orders
    .groupBy($("a"))
    .select($("a"), $("b").sum().as("total"));
GroupBy aggregation on a streaming table produces an updating result. You need a sink that supports upserts (such as JDBC or the upsert-Kafka connector) to write the result.

GroupBy window aggregation

Window aggregations compute results over finite time windows. Use Tumble, Slide, or Session window types:
import org.apache.flink.table.api.Tumble;
import static org.apache.flink.table.api.Expressions.*;

Table orders = tableEnv.from("Orders");
Table result = orders
    .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w"))
    .groupBy($("a"), $("w"))
    .select(
        $("a"),
        $("w").start(),
        $("w").end(),
        $("b").sum().as("total")
    );

Over window aggregation

Over windows compute aggregates for each row over a range of surrounding rows. Equivalent to SQL OVER.
import org.apache.flink.table.api.Over;
import static org.apache.flink.table.api.Expressions.*;

Table orders = tableEnv.from("Orders");
Table result = orders
    .window(
        Over
            .partitionBy($("a"))
            .orderBy($("rowtime"))
            .preceding(UNBOUNDED_RANGE)
            .following(CURRENT_RANGE)
            .as("w")
    )
    .select(
        $("a"),
        $("b").avg().over($("w")).as("running_avg"),
        $("b").max().over($("w")).as("running_max")
    );

Distinct aggregation

Table orders = tableEnv.from("Orders");
Table result = orders
    .groupBy($("a"))
    .select($("a"), $("b").sum().distinct().as("distinct_sum"));

Joins

Inner join

Table left  = tableEnv.from("Orders").select($("a"), $("b"));
Table right = tableEnv.from("Customers").select($("d"), $("e"));

Table result = left.join(right)
    .where($("a").isEqual($("d")))
    .select($("a"), $("b"), $("e"));

Outer join

Table leftOuter  = left.leftOuterJoin(right, $("a").isEqual($("d")))
                       .select($("a"), $("b"), $("e"));
Table rightOuter = left.rightOuterJoin(right, $("a").isEqual($("d")))
                       .select($("a"), $("b"), $("e"));
Table fullOuter  = left.fullOuterJoin(right, $("a").isEqual($("d")))
                       .select($("a"), $("b"), $("e"));

Interval join

Interval joins bound streaming joins by a time condition, allowing Flink to clear state for rows that can no longer match:
Table left  = tableEnv.from("Orders").select($("a"), $("b"), $("ltime"));
Table right = tableEnv.from("Shipments").select($("d"), $("e"), $("rtime"));

Table result = left.join(right)
    .where(
        and(
            $("a").isEqual($("d")),
            $("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())),
            $("ltime").isLess($("rtime").plus(lit(10).minutes()))
        )
    )
    .select($("a"), $("b"), $("e"), $("ltime"));

Join with a table function (lateral join)

Join each row with the output of a table function:
tableEnv.createTemporarySystemFunction("split", MySplitUDTF.class);

Table orders = tableEnv.from("Orders");
Table result = orders
    .joinLateral(call("split", $("c")).as("word", "len", "score"))
    .select($("a"), $("b"), $("word"), $("len"));

// preserve unmatched outer rows
Table result2 = orders
    .leftOuterJoinLateral(call("split", $("c")).as("word", "len", "score"))
    .select($("a"), $("b"), $("word"), $("len"));

Set operations

Table left  = tableEnv.from("orders_eu");
Table right = tableEnv.from("orders_us");

// remove duplicates
Table union       = left.union(right);

// keep duplicates
Table unionAll    = left.unionAll(right);

// rows present in both (batch only)
Table intersect   = left.intersect(right);

// rows in left but not right (batch only)
Table minus       = left.minus(right);

// filter by membership in a subquery
Table result      = left.select($("a"), $("b"))
                        .where($("a").in(right));

Ordering and limiting

// sort ascending
Table sorted = orders.orderBy($("b").asc());

// first 5 rows
Table top5 = orders.orderBy($("b").desc()).fetch(5);

// skip 10, return the next 5
Table paged = orders.orderBy($("b").asc()).offset(10).fetch(5);
orderBy on an unbounded streaming table requires either a time attribute column or a subsequent fetch to bound the result.

Writing results

Table orders = tableEnv.from("Orders");

// insert into a registered sink table
orders.insertInto("OrderArchive").execute();

// equivalently in Python
# orders.execute_insert("OrderArchive")

Complex example: filter, normalize, window average

Table orders = tEnv.from("Orders");

Table result = orders
    .filter(
        and(
            $("a").isNotNull(),
            $("b").isNotNull(),
            $("c").isNotNull()
        )
    )
    .select($("a").lowerCase().as("a"), $("b"), $("rowtime"))
    .window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow"))
    .groupBy($("hourlyWindow"), $("a"))
    .select(
        $("a"),
        $("hourlyWindow").end().as("hour"),
        $("b").avg().as("avgBillingAmount")
    );

Build docs developers (and LLMs) love