Skip to main content
The DataStream API is Flink’s core API for building data stream processing programs. It lets you express transformations on potentially unbounded streams of data — filtering, mapping, aggregating, windowing — and connect sources to sinks to produce results. A DataStream is an immutable, distributed collection of data. You cannot inspect its elements directly or mutate them in place. Instead, you chain API operations that describe a computation graph, then trigger execution with env.execute().

Program structure

Every DataStream program follows the same five-step pattern:
1

Obtain a StreamExecutionEnvironment

The StreamExecutionEnvironment is the entry point for every Flink program. It holds configuration and creates data sources.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Use getExecutionEnvironment() in almost all cases. It detects context automatically: it creates a local, single-JVM environment when you run from an IDE, and connects to a cluster when you submit a JAR with bin/flink run.Two other factory methods exist for specific cases:
// Explicitly create a local environment (useful in tests)
StreamExecutionEnvironment local = StreamExecutionEnvironment.createLocalEnvironment();

// Connect to a specific remote cluster
StreamExecutionEnvironment remote = StreamExecutionEnvironment.createRemoteEnvironment(
    "host", 6123, "/path/to/job.jar"
);
2

Load or create initial data

Attach a source to the environment to get a DataStream. Flink provides built-in sources for collections, files, sockets, and sequences, plus a connector ecosystem for Kafka, Kinesis, and more.
// From a bounded collection (useful for testing)
DataStream<String> words = env.fromData("hello", "world", "hello", "flink");

// From a file
FileSource<String> fileSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///data/input"))
    .build();
DataStream<String> lines = env.fromSource(
    fileSource, WatermarkStrategy.noWatermarks(), "file-source"
);

// From a socket (useful for quick experiments)
DataStream<String> socket = env.socketTextStream("localhost", 9999);
3

Apply transformations

Transformations produce new DataStream instances from existing ones. They are lazy — nothing executes until you call execute().
DataStream<String> words = env.fromData("hello world", "hello flink");

DataStream<String> split = words.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    }
).returns(String.class);

DataStream<String> filtered = split.filter(word -> !word.isEmpty());
4

Write results to a sink

Sinks consume a DataStream and write records to external systems. For production use, prefer sinkTo() with the Sink API over the older addSink().
// Print to stdout — good for development
filtered.print();

// Write to files with exactly-once semantics
FileSink<String> fileSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>())
    .build();
filtered.sinkTo(fileSink);
5

Trigger execution

Flink programs are evaluated lazily. Calling execute() submits the job graph for execution and blocks until the job finishes.
env.execute("My Streaming Job");
Use executeAsync() if you want to submit without blocking:
JobClient client = env.executeAsync("My Streaming Job");
JobExecutionResult result = client.getJobExecutionResult().get();

Complete example: streaming word count

The following program reads words from a socket, counts them in 5-second tumbling windows, and prints results to stdout.
WindowWordCount.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
            .sum(1);

        counts.print();

        env.execute("Window WordCount");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
            for (String word : sentence.split("\\s")) {
                if (!word.isEmpty()) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}
Start a netcat listener to feed the program input:
nc -lk 9999
Then run the program. Type words into the netcat terminal and watch counts appear.

Key concepts

Lazy evaluation: Calling .map(), .filter(), or .keyBy() builds a dataflow graph in memory. No data moves until execute() is called. This lets Flink optimize the full graph before execution begins. Parallelism: Each operator runs as one or more parallel instances. Set job-level parallelism with env.setParallelism(n) or per-operator with .map(...).setParallelism(n). Operator chaining: Flink automatically chains adjacent operators that can share a thread (for example, consecutive map() calls) to reduce serialization overhead. You can disable chaining globally or per operator. Buffer timeout: By default, Flink buffers records for up to 100 ms before flushing to downstream operators. Lower this for latency-sensitive pipelines:
env.setBufferTimeout(10); // milliseconds

Explore further

Execution Mode

Switch between STREAMING and BATCH execution modes for bounded data.

Data Sources

Built-in sources, the FLIP-27 Source API, and FileSource.

Operators

map, flatMap, filter, keyBy, window, connect, and more.

Data Sinks

FileSink, print, custom sinks, and exactly-once delivery.

Event Time & Watermarks

WatermarkStrategy, TimestampAssigner, and windowing with event time.

Fault Tolerance

Checkpointing, restart strategies, and exactly-once guarantees.

Working with State

ValueState, ListState, MapState, and state TTL.

Side Outputs

Route records to multiple output streams from a single operator.

Build docs developers (and LLMs) love