The DataGen connector provides a Source implementation that generates input data for Flink pipelines. It is useful when developing locally or running demos without access to external systems such as Kafka. The connector is built-in and requires no additional dependencies.
How it works
DataGeneratorSource produces N records in parallel. The source divides the full record count into sub-sequences—one per parallel subtask—and maps each sequence index to a record using a user-supplied GeneratorFunction<Long, T>.
The output order depends on parallelism. With parallelism 1, records are produced in index order. With higher parallelism, each subtask produces its sub-range in order but the overall interleaving is not deterministic.
Basic usage
The following example produces ["Number: 0", "Number: 1", ..., "Number: 999"]:
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
long numberOfRecords = 1000;
DataGeneratorSource<String> source =
new DataGeneratorSource<>(generatorFunction, numberOfRecords, Types.STRING);
DataStreamSource<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Generator Source"
);
To generate structured objects, use a lambda that returns a POJO or a tuple:
GeneratorFunction<Long, Order> orderGenerator = index -> new Order(
index, // orderId
"customer-" + (index % 100), // customerId
ThreadLocalRandom.current().nextDouble() * 1000, // amount
System.currentTimeMillis() // timestamp
);
DataGeneratorSource<Order> source =
new DataGeneratorSource<>(orderGenerator, Long.MAX_VALUE, TypeInformation.of(Order.class));
Rate limiting
DataGeneratorSource has built-in support for rate limiting. The rate applies across all parallel subtasks combined.
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
DataGeneratorSource<String> source = new DataGeneratorSource<>(
generatorFunction,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(100), // 100 records/second total
Types.STRING
);
Flink provides two built-in RateLimiterStrategy implementations:
| Strategy | Description |
|---|
RateLimiterStrategy.perSecond(rate) | Limits throughput to the specified number of records per second across all subtasks. |
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint) | Emits at most the specified number of records between consecutive checkpoints. |
Boundedness
DataGeneratorSource is always bounded. Setting numberOfRecords to Long.MAX_VALUE makes it effectively unbounded in practice—the source will never reach the end.
For finite sequences, run the job in BATCH execution mode for better performance:
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataGeneratorSource<String> source =
new DataGeneratorSource<>(index -> "Record: " + index, 10_000, Types.STRING);
Watermarks
You can produce deterministic watermarks directly from generated events by providing a custom WatermarkStrategy:
GeneratorFunction<Long, Event> generator = index -> new Event(index, index * 1000L);
DataGeneratorSource<Event> source = new DataGeneratorSource<>(
generator,
Long.MAX_VALUE,
Types.POJO(Event.class)
);
DataStreamSource<Event> stream = env.fromSource(
source,
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getTimestamp()),
"Generator Source"
);
Exactly-once guarantees
DataGeneratorSource supports at-least-once and end-to-end exactly-once processing under one condition: the GeneratorFunction must be deterministic—the same Long index must always produce the same output record. When this holds, Flink can replay the same indices after a recovery and produce identical records.
If your GeneratorFunction uses ThreadLocalRandom, System.currentTimeMillis(), or any other non-deterministic call, the source cannot provide exactly-once guarantees because re-executed indices will produce different records.
Configuration summary
| Parameter | Type | Description |
|---|
generatorFunction | GeneratorFunction<Long, T> | Maps a sequence index to a generated record. |
count | long | Total number of records to generate. Use Long.MAX_VALUE for an unbounded stream. |
rateLimiterStrategy | RateLimiterStrategy | Optional. Controls the maximum emission rate. |
outputTypeInfo | TypeInformation<T> | Type information for the generated records. |