Skip to main content
Flink’s DataStream API connects to external systems through sources and sinks. A source reads data into a Flink job; a sink writes data out. Connectors wire these together with external storage, messaging, and file systems.

Source and sink model

Every connector implements either the Source interface, the Sink interface, or both. You add them to a job with env.fromSource(...) and stream.sinkTo(...) respectively.
// Source
DataStreamSource<String> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Source"
);

// Sink
stream.sinkTo(sink);

Delivery guarantees

Flink connectors provide one of three processing guarantees:
GuaranteeDescription
At-most-onceRecords may be lost on failure. No re-delivery.
At-least-onceRecords are never lost but may be delivered more than once. The source re-reads from the last checkpoint position.
Exactly-onceEvery record is processed exactly once end-to-end. Requires the source to participate in checkpointing and the sink to use transactions or idempotent writes.
Exactly-once semantics require checkpointing to be enabled:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // checkpoint every 60 seconds

Source guarantees

SourceGuaranteeNotes
Apache KafkaExactly-onceOffsets committed on checkpoint
AWS Kinesis StreamsExactly-once
RabbitMQAt-most-once (v0.10) / Exactly-once (v1.0)
Google PubSubAt-least-once
Files (FileSource)Exactly-once
CollectionsExactly-once
SocketsAt-most-once

Sink guarantees

SinkGuaranteeNotes
Apache KafkaAt-least-once / Exactly-onceExactly-once with transactional producers (v0.11+)
Elasticsearch / OpensearchAt-least-once
CassandraAt-least-once / Exactly-onceExactly-once for idempotent updates only
Amazon DynamoDBAt-least-once
Amazon Kinesis Data StreamsAt-least-once
Amazon Kinesis Data FirehoseAt-least-once
File sinks (FileSink)Exactly-onceRequires checkpointing in streaming mode
Socket sinksAt-least-once

Built-in connectors

The following connectors are included in the flink-connectors module and ship with Flink source releases. They are not included in the binary distribution JARs—you must add the relevant dependency to your project.

Filesystem

Read and write files on POSIX, HDFS, S3, OSS, and other supported filesystems. Supports CSV, Avro, Parquet, ORC, and custom formats.

DataGen

Generate synthetic data streams for testing and development without an external system.

Hybrid Source

Chain multiple sources in sequence. Read bounded historical data then switch automatically to an unbounded real-time source.

External connectors

The following connectors are maintained in separate repositories outside the main Flink repository. Add the appropriate Maven dependency for each.
ConnectorDirectionRepository
Apache KafkaSource / Sinkapache/flink-connector-kafka
Apache CassandraSource / Sinkapache/flink-connector-cassandra
Amazon DynamoDBSinkapache/flink-connector-aws
Amazon Kinesis Data StreamsSource / Sinkapache/flink-connector-aws
Amazon Kinesis Data FirehoseSinkapache/flink-connector-aws
ElasticsearchSinkapache/flink-connector-elasticsearch
OpensearchSinkapache/flink-connector-opensearch
RabbitMQSource / Sinkapache/flink-connector-rabbitmq
Google PubSubSource / Sinkapache/flink-connector-gcp-pubsub
Apache PulsarSourceapache/flink-connector-pulsar
JDBCSinkapache/flink-connector-jdbc
MongoDBSource / Sinkapache/flink-connector-mongodb
PrometheusSinkapache/flink-connector-prometheus

Connectors in Apache Bahir

Apache Bahir hosts additional streaming connectors that are released independently:
  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

Async I/O for data enrichment

Instead of using a connector, you can query an external database or web service inside a Map or FlatMap operator. Flink’s Asynchronous I/O API lets you do this efficiently by overlapping multiple outstanding requests:
DataStream<String> enriched = AsyncDataStream.unorderedWait(
    input,
    new AsyncDatabaseRequest(),
    1000,
    TimeUnit.MILLISECONDS,
    100 // max concurrent requests
);
Because flink-connector-base is bundled in flink-dist, externalized connectors no longer bundle it as a transitive dependency. If you run examples locally outside of a Flink cluster, make sure flink-connector-base is present on your classpath.

Build docs developers (and LLMs) love