Skip to main content
The DataGen connector provides a Source implementation that generates input data for Flink pipelines. It is useful when developing locally or running demos without access to external systems such as Kafka. The connector is built-in and requires no additional dependencies.

How it works

DataGeneratorSource produces N records in parallel. The source divides the full record count into sub-sequences—one per parallel subtask—and maps each sequence index to a record using a user-supplied GeneratorFunction<Long, T>. The output order depends on parallelism. With parallelism 1, records are produced in index order. With higher parallelism, each subtask produces its sub-range in order but the overall interleaving is not deterministic.

Basic usage

The following example produces ["Number: 0", "Number: 1", ..., "Number: 999"]:
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
long numberOfRecords = 1000;

DataGeneratorSource<String> source =
    new DataGeneratorSource<>(generatorFunction, numberOfRecords, Types.STRING);

DataStreamSource<String> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "Generator Source"
);
To generate structured objects, use a lambda that returns a POJO or a tuple:
GeneratorFunction<Long, Order> orderGenerator = index -> new Order(
    index,                                   // orderId
    "customer-" + (index % 100),             // customerId
    ThreadLocalRandom.current().nextDouble() * 1000, // amount
    System.currentTimeMillis()               // timestamp
);

DataGeneratorSource<Order> source =
    new DataGeneratorSource<>(orderGenerator, Long.MAX_VALUE, TypeInformation.of(Order.class));

Rate limiting

DataGeneratorSource has built-in support for rate limiting. The rate applies across all parallel subtasks combined.
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;

GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;

DataGeneratorSource<String> source = new DataGeneratorSource<>(
    generatorFunction,
    Long.MAX_VALUE,
    RateLimiterStrategy.perSecond(100), // 100 records/second total
    Types.STRING
);
Flink provides two built-in RateLimiterStrategy implementations:
StrategyDescription
RateLimiterStrategy.perSecond(rate)Limits throughput to the specified number of records per second across all subtasks.
RateLimiterStrategy.perCheckpoint(recordsPerCheckpoint)Emits at most the specified number of records between consecutive checkpoints.

Boundedness

DataGeneratorSource is always bounded. Setting numberOfRecords to Long.MAX_VALUE makes it effectively unbounded in practice—the source will never reach the end. For finite sequences, run the job in BATCH execution mode for better performance:
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DataGeneratorSource<String> source =
    new DataGeneratorSource<>(index -> "Record: " + index, 10_000, Types.STRING);

Watermarks

You can produce deterministic watermarks directly from generated events by providing a custom WatermarkStrategy:
GeneratorFunction<Long, Event> generator = index -> new Event(index, index * 1000L);

DataGeneratorSource<Event> source = new DataGeneratorSource<>(
    generator,
    Long.MAX_VALUE,
    Types.POJO(Event.class)
);

DataStreamSource<Event> stream = env.fromSource(
    source,
    WatermarkStrategy
        .<Event>forMonotonousTimestamps()
        .withTimestampAssigner((event, ts) -> event.getTimestamp()),
    "Generator Source"
);

Exactly-once guarantees

DataGeneratorSource supports at-least-once and end-to-end exactly-once processing under one condition: the GeneratorFunction must be deterministic—the same Long index must always produce the same output record. When this holds, Flink can replay the same indices after a recovery and produce identical records.
If your GeneratorFunction uses ThreadLocalRandom, System.currentTimeMillis(), or any other non-deterministic call, the source cannot provide exactly-once guarantees because re-executed indices will produce different records.

Configuration summary

ParameterTypeDescription
generatorFunctionGeneratorFunction<Long, T>Maps a sequence index to a generated record.
countlongTotal number of records to generate. Use Long.MAX_VALUE for an unbounded stream.
rateLimiterStrategyRateLimiterStrategyOptional. Controls the maximum emission rate.
outputTypeInfoTypeInformation<T>Type information for the generated records.

Build docs developers (and LLMs) love