Skip to main content
The Table API is a relational, declarative API for building both streaming and batch data pipelines. It provides a SQL-inspired fluent DSL in Java and Python, with the same query semantics whether you run on an infinite stream or a finite batch dataset.

What you’ll build

In this tutorial you will build a spend report that aggregates credit card transaction amounts by account ID and hour:
Transactions (DataGen source, 100 rows/sec) → hourly GROUP BY → Console sink
You’ll learn how to:
  • Create a TableEnvironment for streaming execution
  • Define a source table using TableDescriptor with the DataGen connector
  • Apply select, groupBy, and sum operations using the Table API
  • Write a User-Defined Function (UDF) to extend built-in functionality
  • Use tumbling windows for time-based aggregation
  • Test your streaming logic in batch mode

Prerequisites

  • Java 11, 17, or 21
  • Maven 3.8.6 or later
  • An IDE (IntelliJ IDEA recommended)

Steps

1

Create the project

Generate a skeleton project using the Flink Table API Maven archetype:
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-table-java \
    -DarchetypeVersion=1.20.0 \
    -DgroupId=spendreport \
    -DartifactId=spendreport \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false
Maven creates a spendreport/ directory with SpendReport.java, SpendReportTest.java, and a pom.xml containing the required Flink Table API dependencies.
If you are running a SNAPSHOT version of Flink, use the corresponding snapshot version in -DarchetypeVersion.
2

Create the TableEnvironment

The TableEnvironment is the entry point for all Table API and SQL operations. You configure it with EnvironmentSettings to choose between streaming and batch execution.
SpendReport.java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
3

Define the source table

Register a source table backed by the DataGen connector. DataGen generates random rows at a configurable rate — no external system is needed.The transactions table has three columns:
  • accountId — a random BIGINT between 1 and 5
  • amount — a random BIGINT between 1 and 1000
  • transactionTime — a TIMESTAMP(3) with a watermark for event-time processing
SpendReport.java
import org.apache.flink.table.api.*;

tEnv.createTemporaryTable("transactions",
    TableDescriptor.forConnector("datagen")
        .schema(Schema.newBuilder()
            .column("accountId", DataTypes.BIGINT())
            .column("amount", DataTypes.BIGINT())
            .column("transactionTime", DataTypes.TIMESTAMP(3))
            .watermark("transactionTime", "transactionTime - INTERVAL '5' SECOND")
            .build())
        .option("rows-per-second", "100")
        .option("fields.accountId.min", "1")
        .option("fields.accountId.max", "5")
        .option("fields.amount.min", "1")
        .option("fields.amount.max", "1000")
        .build());
The watermark declaration transactionTime - INTERVAL '5' SECOND tells Flink that events may arrive up to 5 seconds late. Flink uses this to determine when to close event-time windows.
4

Write the report query

The report groups transactions by accountId and by the hour in which they occurred, then sums the transaction amounts. The built-in floor function rounds a timestamp down to the nearest hour boundary.
SpendReport.java
import org.apache.flink.table.api.Table;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import static org.apache.flink.table.api.Expressions.$;

public static Table report(Table transactions) {
    return transactions
        .select(
            $("accountId"),
            $("transactionTime").floor(TimeIntervalUnit.HOUR).as("logTs"),
            $("amount"))
        .groupBy($("accountId"), $("logTs"))
        .select(
            $("accountId"),
            $("logTs"),
            $("amount").sum().as("amount"));
}
Call it from main():
SpendReport.java
Table transactions = tEnv.from("transactions");
Table result = report(transactions);
result.execute().print();
5

Test in batch mode

One of Flink’s key properties is that streaming and batch programs share the same semantics. You can test your report() logic with a fixed, finite dataset in batch mode and then deploy it unchanged against a live stream.
SpendReportTest.java
import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;
import java.time.LocalDateTime;

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

Table transactions = tEnv.fromValues(
    DataTypes.ROW(
        DataTypes.FIELD("accountId", DataTypes.BIGINT()),
        DataTypes.FIELD("amount", DataTypes.BIGINT()),
        DataTypes.FIELD("transactionTime", DataTypes.TIMESTAMP(3))
    ),
    Row.of(1L, 188L, LocalDateTime.of(2024, 1, 1, 9, 0, 0)),
    Row.of(1L, 374L, LocalDateTime.of(2024, 1, 1, 9, 30, 0)),
    Row.of(2L, 200L, LocalDateTime.of(2024, 1, 1, 9, 15, 0)),
    Row.of(1L, 600L, LocalDateTime.of(2024, 1, 1, 10, 0, 0))
);

Table result = SpendReport.report(transactions);
// account 1 at 9am: 188 + 374 = 562
// account 2 at 9am: 200
// account 1 at 10am: 600
result.execute().print();
6

(Optional) Write a User-Defined Function

Flink has many built-in functions, but you can extend the Table API with custom scalar functions (UDFs) when needed. Here is how to implement the floor-to-hour logic as a UDF:
MyFloor.java
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;

public class MyFloor extends ScalarFunction {

    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {
        return timestamp.truncatedTo(ChronoUnit.HOURS);
    }
}
Use the UDF in your report:
SpendReport.java
import static org.apache.flink.table.api.Expressions.call;

public static Table report(Table transactions) {
    return transactions
        .select(
            $("accountId"),
            call(MyFloor.class, $("transactionTime")).as("logTs"),
            $("amount"))
        .groupBy($("accountId"), $("logTs"))
        .select(
            $("accountId"),
            $("logTs"),
            $("amount").sum().as("amount"));
}
7

(Optional) Use tumbling windows

For finer-grained time bucketing, replace the floor call with a tumbling window. Windows are first-class citizens in Flink’s runtime and enable additional optimizations compared to a manual floor approach.
SpendReport.java
import org.apache.flink.table.api.Tumble;
import static org.apache.flink.table.api.Expressions.lit;

public static Table report(Table transactions) {
    return transactions
        .window(Tumble.over(lit(10).seconds()).on($("transactionTime")).as("logTs"))
        .groupBy($("accountId"), $("logTs"))
        .select(
            $("accountId"),
            $("logTs").start().as("logTs"),
            $("amount").sum().as("amount"));
}
This creates 10-second tumbling windows. A transaction with transactionTime = 2024-01-01 01:23:47 is placed in the 2024-01-01 01:23:40 window.
8

Run the application

Run the SpendReport class from your IDE. Results are printed to the console as they are computed. Since the DataGen source is unbounded, the job runs continuously until you stop it.

Complete programs

import org.apache.flink.table.api.*;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import static org.apache.flink.table.api.Expressions.$;

public class SpendReport {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.createTemporaryTable("transactions",
            TableDescriptor.forConnector("datagen")
                .schema(Schema.newBuilder()
                    .column("accountId", DataTypes.BIGINT())
                    .column("amount", DataTypes.BIGINT())
                    .column("transactionTime", DataTypes.TIMESTAMP(3))
                    .watermark("transactionTime", "transactionTime - INTERVAL '5' SECOND")
                    .build())
                .option("rows-per-second", "100")
                .option("fields.accountId.min", "1")
                .option("fields.accountId.max", "5")
                .option("fields.amount.min", "1")
                .option("fields.amount.max", "1000")
                .build());

        Table transactions = tEnv.from("transactions");
        Table result = report(transactions);
        result.execute().print();
    }

    public static Table report(Table transactions) {
        return transactions
            .select(
                $("accountId"),
                $("transactionTime").floor(TimeIntervalUnit.HOUR).as("logTs"),
                $("amount"))
            .groupBy($("accountId"), $("logTs"))
            .select(
                $("accountId"),
                $("logTs"),
                $("amount").sum().as("amount"));
    }
}

What’s next

  • Table API reference — Explore all operations, window types, and join semantics in the Table API documentation.
  • User-Defined Functions — Learn how to write scalar, table, and aggregate UDFs in the UDF guide.
  • SQL Quickstart — Run the same kind of aggregation query interactively from the SQL Client.
  • DataStream API Quickstart — Build a stateful fraud detection system with the lower-level DataStream API.

Build docs developers (and LLMs) love