What you’ll build
In this tutorial you will build a spend report that aggregates credit card transaction amounts by account ID and hour:- Create a
TableEnvironmentfor streaming execution - Define a source table using
TableDescriptorwith the DataGen connector - Apply
select,groupBy, andsumoperations using the Table API - Write a User-Defined Function (UDF) to extend built-in functionality
- Use tumbling windows for time-based aggregation
- Test your streaming logic in batch mode
Prerequisites
- Java
- Python
- Java 11, 17, or 21
- Maven 3.8.6 or later
- An IDE (IntelliJ IDEA recommended)
Steps
Create the project
- Java (Maven archetype)
- Python
Generate a skeleton project using the Flink Table API Maven archetype:Maven creates a
spendreport/ directory with SpendReport.java, SpendReportTest.java, and a pom.xml containing the required Flink Table API dependencies.If you are running a SNAPSHOT version of Flink, use the corresponding snapshot version in
-DarchetypeVersion.Create the TableEnvironment
The
TableEnvironment is the entry point for all Table API and SQL operations. You configure it with EnvironmentSettings to choose between streaming and batch execution.- Java
- Python
SpendReport.java
Define the source table
Register a source table backed by the DataGen connector. DataGen generates random rows at a configurable rate — no external system is needed.The The watermark declaration
transactions table has three columns:accountId— a random BIGINT between 1 and 5amount— a random BIGINT between 1 and 1000transactionTime— a TIMESTAMP(3) with a watermark for event-time processing
- Java
- Python
SpendReport.java
transactionTime - INTERVAL '5' SECOND tells Flink that events may arrive up to 5 seconds late. Flink uses this to determine when to close event-time windows.Write the report query
The report groups transactions by Call it from
accountId and by the hour in which they occurred, then sums the transaction amounts. The built-in floor function rounds a timestamp down to the nearest hour boundary.- Java
- Python
SpendReport.java
main():SpendReport.java
Test in batch mode
One of Flink’s key properties is that streaming and batch programs share the same semantics. You can test your
report() logic with a fixed, finite dataset in batch mode and then deploy it unchanged against a live stream.- Java
- Python
SpendReportTest.java
(Optional) Write a User-Defined Function
Flink has many built-in functions, but you can extend the Table API with custom scalar functions (UDFs) when needed. Here is how to implement the Use the UDF in your report:
floor-to-hour logic as a UDF:- Java
- Python
MyFloor.java
SpendReport.java
(Optional) Use tumbling windows
For finer-grained time bucketing, replace the This creates 10-second tumbling windows. A transaction with
floor call with a tumbling window. Windows are first-class citizens in Flink’s runtime and enable additional optimizations compared to a manual floor approach.- Java
- Python
SpendReport.java
transactionTime = 2024-01-01 01:23:47 is placed in the 2024-01-01 01:23:40 window.Complete programs
What’s next
- Table API reference — Explore all operations, window types, and join semantics in the Table API documentation.
- User-Defined Functions — Learn how to write scalar, table, and aggregate UDFs in the UDF guide.
- SQL Quickstart — Run the same kind of aggregation query interactively from the SQL Client.
- DataStream API Quickstart — Build a stateful fraud detection system with the lower-level DataStream API.

