A data source is the starting point of every DataStream program. It produces elements that flow through the rest of the pipeline.
Flink provides two source APIs:
- FLIP-27 Source API (recommended): A unified, connector-friendly API introduced in Flink 1.12. It separates the responsibilities of split discovery (
SplitEnumerator) from data reading (SourceReader) and supports both bounded and unbounded sources with first-class watermark support.
- Legacy SourceFunction API: The older
SourceFunction / RichSourceFunction / ParallelSourceFunction interfaces, still supported but not recommended for new development.
Built-in sources
These are available directly on StreamExecutionEnvironment and require no connector dependencies.
Collection sources
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// From varargs
DataStream<String> stream1 = env.fromData("hello", "world", "flink");
// From a Java Collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> stream2 = env.fromData(numbers);
// From an iterator (non-parallel, parallelism = 1)
Iterator<Long> it = ...;
DataStream<Long> stream3 = env.fromCollection(it, Long.class);
Collection sources always run with parallelism 1 because iterators are not splittable. Use fromSequence or FileSource for parallel ingestion.
Sequence source
Generates a sequence of Long values in a given range, in parallel:
// Generates numbers 1 through 1000, split across parallel tasks
DataStream<Long> seq = env.fromSequence(1, 1000);
Socket source
Reads newline-delimited text from a TCP socket. Useful for quick experiments and debugging:
DataStream<String> socket = env.socketTextStream("localhost", 9999);
FileSource (FLIP-27)
FileSource is Flink’s recommended way to read from files. It implements the FLIP-27 Source API and supports both batch-style (process once) and continuous (monitor for new files) modes.
Reading a directory of text files
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FileSource<String> source = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("file:///data/logs/"),
new Path("s3://my-bucket/input/")
)
.build();
DataStream<String> lines = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"log-files"
);
Continuous file monitoring
To monitor a directory for new files and process them as they appear, use monitorContinuously:
ContinuousFileSource.java
FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///data/incoming/"))
.monitorContinuously(Duration.ofSeconds(10)) // check for new files every 10 seconds
.build();
DataStream<String> lines = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"continuous-files"
);
FileSource accepts any BulkFormat or StreamFormat. For CSV, use CsvReaderFormat:
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
CsvReaderFormat<MyRecord> csvFormat = CsvReaderFormat.forPojo(MyRecord.class);
FileSource<MyRecord> source = FileSource
.forRecordStreamFormat(csvFormat, new Path("file:///data/records.csv"))
.build();
DataStream<MyRecord> records = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"csv-source"
);
FLIP-27 Source API
For connector authors or advanced use cases, the FLIP-27 API provides full control over split discovery and record reading.
Core interfaces
| Interface | Responsibility |
|---|
Source<T, SplitT, EnumChkT> | Entry point; creates the enumerator and reader |
SplitEnumerator<SplitT, EnumChkT> | Runs on the JobManager; discovers splits and assigns them to readers |
SourceReader<T, SplitT> | Runs on TaskManagers; reads records from assigned splits |
SourceSplit | Represents a unit of parallelizable work (e.g., a file, Kafka partition) |
Registering a FLIP-27 source
Source<MyEvent, ?, ?> mySource = new MyCustomSource(...);
DataStream<MyEvent> stream = env.fromSource(
mySource,
WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestamp()),
"my-source"
);
Watermark integration
FLIP-27 sources integrate watermarks at the split level. When Flink reads multiple splits in parallel, it merges per-split watermarks correctly. This is more accurate than applying a watermark strategy after the source:
// Preferred: watermarks applied at source, per split
DataStream<MyEvent> stream = env.fromSource(
mySource,
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((e, ts) -> e.eventTime()),
"my-source"
);
// Less accurate: watermarks applied after source merge all splits
DataStream<MyEvent> stream = env.fromSource(mySource, WatermarkStrategy.noWatermarks(), "src")
.assignTimestampsAndWatermarks(
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
);
Using connector sources
Most production sources come from Flink connectors (Kafka, Kinesis, Pulsar, etc.) and implement the FLIP-27 API. Add the connector dependency to your project and use the connector’s builder:
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("events")
.setGroupId("flink-consumer-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> events = env.fromSource(
kafkaSource,
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)),
"kafka-events"
);
Legacy SourceFunction API
The older SourceFunction API is still available for backward compatibility but should not be used in new code.
// Non-parallel source (parallelism = 1)
public class MyLegacySource implements SourceFunction<String> {
private volatile boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("record-" + System.currentTimeMillis());
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
// Parallel source (runs on each task slot)
public class MyParallelSource extends RichParallelSourceFunction<Long> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
int partition = getRuntimeContext().getIndexOfThisSubtask();
long i = partition;
while (running) {
ctx.collect(i);
i += getRuntimeContext().getNumberOfParallelSubtasks();
}
}
@Override
public void cancel() {
running = false;
}
}
env.addSource(new MyParallelSource()).print();
SourceFunction-based sources do not support per-split watermarks or the FLIP-27 split-level watermark alignment feature. Migrate to the FLIP-27 Source interface for new connectors.