Skip to main content
A data source is the starting point of every DataStream program. It produces elements that flow through the rest of the pipeline. Flink provides two source APIs:
  • FLIP-27 Source API (recommended): A unified, connector-friendly API introduced in Flink 1.12. It separates the responsibilities of split discovery (SplitEnumerator) from data reading (SourceReader) and supports both bounded and unbounded sources with first-class watermark support.
  • Legacy SourceFunction API: The older SourceFunction / RichSourceFunction / ParallelSourceFunction interfaces, still supported but not recommended for new development.

Built-in sources

These are available directly on StreamExecutionEnvironment and require no connector dependencies.

Collection sources

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// From varargs
DataStream<String> stream1 = env.fromData("hello", "world", "flink");

// From a Java Collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
DataStream<Integer> stream2 = env.fromData(numbers);

// From an iterator (non-parallel, parallelism = 1)
Iterator<Long> it = ...;
DataStream<Long> stream3 = env.fromCollection(it, Long.class);
Collection sources always run with parallelism 1 because iterators are not splittable. Use fromSequence or FileSource for parallel ingestion.

Sequence source

Generates a sequence of Long values in a given range, in parallel:
// Generates numbers 1 through 1000, split across parallel tasks
DataStream<Long> seq = env.fromSequence(1, 1000);

Socket source

Reads newline-delimited text from a TCP socket. Useful for quick experiments and debugging:
DataStream<String> socket = env.socketTextStream("localhost", 9999);

FileSource (FLIP-27)

FileSource is Flink’s recommended way to read from files. It implements the FLIP-27 Source API and supports both batch-style (process once) and continuous (monitor for new files) modes.

Reading a directory of text files

FileSourceExample.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileSource<String> source = FileSource
    .forRecordStreamFormat(
        new TextLineInputFormat(),
        new Path("file:///data/logs/"),
        new Path("s3://my-bucket/input/")
    )
    .build();

DataStream<String> lines = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "log-files"
);

Continuous file monitoring

To monitor a directory for new files and process them as they appear, use monitorContinuously:
ContinuousFileSource.java
FileSource<String> source = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///data/incoming/"))
    .monitorContinuously(Duration.ofSeconds(10))  // check for new files every 10 seconds
    .build();

DataStream<String> lines = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "continuous-files"
);

Reading CSV or custom formats

FileSource accepts any BulkFormat or StreamFormat. For CSV, use CsvReaderFormat:
CsvFileSource.java
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

CsvReaderFormat<MyRecord> csvFormat = CsvReaderFormat.forPojo(MyRecord.class);

FileSource<MyRecord> source = FileSource
    .forRecordStreamFormat(csvFormat, new Path("file:///data/records.csv"))
    .build();

DataStream<MyRecord> records = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "csv-source"
);

FLIP-27 Source API

For connector authors or advanced use cases, the FLIP-27 API provides full control over split discovery and record reading.

Core interfaces

InterfaceResponsibility
Source<T, SplitT, EnumChkT>Entry point; creates the enumerator and reader
SplitEnumerator<SplitT, EnumChkT>Runs on the JobManager; discovers splits and assigns them to readers
SourceReader<T, SplitT>Runs on TaskManagers; reads records from assigned splits
SourceSplitRepresents a unit of parallelizable work (e.g., a file, Kafka partition)

Registering a FLIP-27 source

Source<MyEvent, ?, ?> mySource = new MyCustomSource(...);

DataStream<MyEvent> stream = env.fromSource(
    mySource,
    WatermarkStrategy
        .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.getTimestamp()),
    "my-source"
);

Watermark integration

FLIP-27 sources integrate watermarks at the split level. When Flink reads multiple splits in parallel, it merges per-split watermarks correctly. This is more accurate than applying a watermark strategy after the source:
// Preferred: watermarks applied at source, per split
DataStream<MyEvent> stream = env.fromSource(
    mySource,
    WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((e, ts) -> e.eventTime()),
    "my-source"
);

// Less accurate: watermarks applied after source merge all splits
DataStream<MyEvent> stream = env.fromSource(mySource, WatermarkStrategy.noWatermarks(), "src")
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    );

Using connector sources

Most production sources come from Flink connectors (Kafka, Kinesis, Pulsar, etc.) and implement the FLIP-27 API. Add the connector dependency to your project and use the connector’s builder:
KafkaSourceExample.java
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("kafka-broker:9092")
    .setTopics("events")
    .setGroupId("flink-consumer-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> events = env.fromSource(
    kafkaSource,
    WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)),
    "kafka-events"
);

Legacy SourceFunction API

The older SourceFunction API is still available for backward compatibility but should not be used in new code.
LegacySource.java
// Non-parallel source (parallelism = 1)
public class MyLegacySource implements SourceFunction<String> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            ctx.collect("record-" + System.currentTimeMillis());
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

// Parallel source (runs on each task slot)
public class MyParallelSource extends RichParallelSourceFunction<Long> {
    private volatile boolean running = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        int partition = getRuntimeContext().getIndexOfThisSubtask();
        long i = partition;
        while (running) {
            ctx.collect(i);
            i += getRuntimeContext().getNumberOfParallelSubtasks();
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

env.addSource(new MyParallelSource()).print();
SourceFunction-based sources do not support per-split watermarks or the FLIP-27 split-level watermark alignment feature. Migrate to the FLIP-27 Source interface for new connectors.

Build docs developers (and LLMs) love