Skip to main content
The DataStream API is Flink’s core API for building stateful, event-driven stream processing applications. It gives you fine-grained control over state, time, and processing logic, making it ideal for complex use cases like fraud detection, anomaly detection, and real-time enrichment.

What you’ll build

In this tutorial you will build a fraud detection system that processes a stream of credit card transactions and emits an alert whenever it detects a suspicious pattern: a small transaction (under 1.00)immediatelyfollowedbyalargeone(over1.00) immediately followed by a large one (over 500.00) on the same account.
Transactions (source) → keyBy(accountId) → FraudDetector (KeyedProcessFunction) → Alerts (sink)
You’ll learn how to:
  • Set up a StreamExecutionEnvironment
  • Partition a stream by key with keyBy
  • Implement stateful logic with KeyedProcessFunction
  • Use ValueState for per-key fault-tolerant state
  • Run a Flink job from an IDE or the command line

Prerequisites

  • Java 11, 17, or 21
  • Maven 3.8.6 or later
  • An IDE (IntelliJ IDEA recommended)

Steps

1

Create the project

Generate a skeleton project using the Flink Maven archetype:
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.20.0 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=frauddetection \
    -DinteractiveMode=false
Maven creates a frauddetection/ directory containing FraudDetectionJob.java, FraudDetector.java, and pre-configured pom.xml with the flink-streaming-java and flink-walkthrough-common dependencies.
If you are running the latest SNAPSHOT version of Flink, use the corresponding snapshot version number in -DarchetypeVersion. The archetype is only published for released versions.
IntelliJ IDEA note: If you see java.lang.NoClassDefFoundError when running in the IDE, go to Run → Edit Configurations → Modify options and select Include dependencies with ‘Provided’ scope.
2

Set up the execution environment

The StreamExecutionEnvironment is the entry point for every DataStream application. It lets you configure execution properties and create sources.
FraudDetectionJob.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3

Create a source

Sources ingest data from external systems into Flink. The walkthrough uses generated transaction data so you can run the example without any external infrastructure.
The archetype includes a TransactionSource backed by a DataGeneratorSource that produces an infinite stream of credit card transactions. Each Transaction has an accountId, timestamp, and amount.
FraudDetectionJob.java
DataStream<Transaction> transactions = env
    .fromSource(
        TransactionSource.unbounded(),
        WatermarkStrategy.noWatermarks(),
        "transactions");
fromSource takes three arguments: the source connector, a watermark strategy (this job uses processing time so watermarks are not needed), and a display name used in the Web UI.
4

Partition by key and apply the fraud detector

To detect fraud per account, all transactions for the same accountId must be routed to the same operator instance. keyBy achieves this by partitioning the stream. The process() call applies your FraudDetector function in a keyed context, meaning each invocation sees state scoped to the current account.
FraudDetectionJob.java
DataStream<Alert> alerts = transactions
    .keyBy(Transaction::getAccountId)
    .process(new FraudDetector())
    .name("fraud-detector");
5

Write results to a sink

A sink writes results to an external system. For this tutorial, print the alerts to the console.
FraudDetectionJob.java
alerts
    .addSink(new AlertSink())
    .name("send-alerts");

env.execute("Fraud Detection");
AlertSink (provided by the archetype) logs each Alert at INFO level. env.execute() triggers actual job execution — without this call the job graph is built but never run.
6

Implement the FraudDetector

The FraudDetector extends KeyedProcessFunction, which provides per-key state and timer access. The business rule: flag an account when a small transaction (< 1.00)occurs,thenemitanalertifthenexttransactionforthataccountislarge(>1.00) occurs, then emit an alert if the *next* transaction for that account is large (> 500.00).The flag must be stored in managed state, not a plain Java field, because Flink processes transactions from many accounts with a single FraudDetector instance. A plain field would be shared across all keys.
FraudDetector.java
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;

    private transient ValueState<Boolean> flagState;

    @Override
    public void open(OpenContext openContext) {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        // Get the flag for this account (null if never set)
        Boolean lastTransactionWasSmall = flagState.value();

        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());
                collector.collect(alert);
            }
            // Clear the flag regardless — pattern is complete or broken
            flagState.clear();
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            flagState.update(true);
        }
    }
}
ValueState is registered in open(), which runs once before processing begins. ValueState is always scoped to the current key (account ID), so each account has its own independent flag. The three ValueState methods are:
  • value() — read current value, returns null if not set
  • update(value) — write a new value
  • clear() — delete the value for the current key
7

Run the application

Run the FraudDetectionJob class from your IDE. You should see alerts logged by AlertSink:
2024-01-01 14:22:06,220 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2024-01-01 14:22:11,383 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
2024-01-01 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
The job runs continuously because TransactionSource.unbounded() generates transactions indefinitely. Stop it manually when done.

Complete programs

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.walkthrough.common.sink.AlertSink;

public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
            .fromSource(
                TransactionSource.unbounded(),
                WatermarkStrategy.noWatermarks(),
                "transactions");

        DataStream<Alert> alerts = transactions
            .keyBy(Transaction::getAccountId)
            .process(new FraudDetector())
            .name("fraud-detector");

        alerts
            .addSink(new AlertSink())
            .name("send-alerts");

        env.execute("Fraud Detection");
    }
}

What’s next

  • Add timers — The current detector does not enforce a time constraint between the two transactions. See Event-Driven Applications to learn how to add a 1-minute timer so the fraud pattern expires.
  • DataStream API reference — Explore all operators, state backends, and connector APIs in the DataStream API documentation.
  • Table API Quickstart — Try the higher-level Table API for aggregation pipelines.
  • SQL Quickstart — Query streaming data interactively in the SQL Client.

Build docs developers (and LLMs) love