Discussion thread
Vote thread

FLINK-17688 - Getting issue details... STATUS



Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost [1]. EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.

Public Interfaces

There will be no changes to public interfaces (only @Internal classes will be touched)

EFO will be configured using the existing configuration mechanism. The Flink job will supply additional properties to the FlinkKinesisConsumer that will result in it using a different consumption mechanism internally.

Proposed Changes

For a description of the current state see Appendix A - Current State

For an example EFO consumer flow see Appendix B - Example EFO Consumer Application Flow

The FlinkKinesisConsumer will be updated to add support for EFO. There are four distinct changes that are required:

  1. Configuration
  2. Stream Consumer Registration
  3. Ingestion
  4. Tear Down


Support will be retained for the existing polling mechanism. EFO will be introduced as an opt-in feature for the existing connector. The default behaviour will use the existing Polling mechanism, a customer must configure the connector to use EFO.

Reasons a customer might not want to use EFO:

  • EFO results in additional cost per hour
  • EFO has certain limitations, for example a maximum of 20 registered consumers per stream
  • Streams with a single consumer would not benefit from the dedicated throughput (they already have the full quota)

The FlinkKinesisConsumer is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:

flink.stream.recordpublisherSelect RecordPublisher mechanism (efo|polling)stringpolling
flink.stream.efo.consumernameThe name of the consumer to register with KDSstring*see below
flink.stream.efo.registrationDetermines how and when consumer de-/registration is performed (lazy|eager|none)stringlazy
flink.stream.efo.consumerarn.<stream-name>The Consumer ARN for a given stream namestring
flink.stream.registerstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.registerstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.registerstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.registerstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.deregisterstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.deregisterstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.deregisterstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.deregisterstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.liststreamconsumers.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.liststreamconsumers.backoff.baseThe base backoff time between attemptslong200
flink.stream.liststreamconsumers.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.liststreamconsumers.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.shard.subscribetoshard.maxretriesMaximum number of attempts after recoverable exceptionint5
flink.shard.subscribetoshard.backoff.baseThe base backoff time between attemptslong1000
flink.shard.subscribetoshard.backoff.maxThe maximum backoff time between attemptslong2000
flink.shard.subscribetoshard.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5

 * No default. Provided by customer application

Default Consumer Name

A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling SubscribeToShard will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.

Registration/De-registration Configuration

Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:

  • Registration
    • Lazy: Performed during task startup (default option)
      • Pro: Registration is performed from within the tasks running on Flink Task Managers during startup
      • Con: Parallel tasks competing to register may result in an increased start-up time
    • Eager: Performed when the FlinkKinesisConsumer is constructed during application main method
      • Pro: Registration is performed once per Flink job
      • Con: Client execution environment may not have access to external resources
    • None: Performed externally by the user from the AWS CLI (user supplies consumer ARN to connector)
      • Pro: Application start-up time will be reduced
      • Con: Additional configuration to supply and up-front setup to be performed by customer
  • De-registration
    • Lazy/Eager: Performed during task tear-down (default option)
      • Pro: De-registration is performed from within the Flink cluster
      • Con: Parallel tasks competing to de-register may result in an increased tear-down time
    • None: Performed externally by the user (user supplies consumer ARN to connector)
      • Pro: Application tear-down time will be reduced
      • Con: Consumer is still registered once the job ends and user would need to manually de-register to avoid additional cost

Properties will be validated, and de-/registration will be performed based on the supplied configuration:

lazyNot requiredtask startuptask tear-down
eagerNot requiredFlinkKinesisConsumer constructortask tear-down

Stream Consumer Registration

The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.

Stream consumer registration can be invoked:

  • Lazy: During task start-up, in a multi threaded/process environment. RegisterStreamConsumer will be invoked from tasks running on Task Manager slots during startup. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to register the consumer. The first task to invoke the service will win, therefore tasks will poll ListStreamConsumers and attempt to register when not found. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.
  • Eager: Within the FlinkKinesisConsumer constructor from a single threaded environment. The Consumer ARN(s) will be passed into the tasks via the connector configuration.
  • None: The user application will supply the Consumer ARN(s) with the connector properties when registration is disabled.


Most of the existing functionality within the KinesisDataFetcher and ShardConsumer can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.

In the current implementation a ShardConsumer is responsible for pulling data from KDS and passing it onto the Flink application, via the KinesisDataFetcher. A shard consumer is comprised of the following components:

  • Kinesis Proxy
    • A wrapper to proxy calls to AWS services
    • Handles errors and retry with backoff
  • Shard Metrics Reporter
    • millisBehindLatest
    • bytesPerRead
    • numberOfAggregatedRecords
    • A class used to report arbitrary metrics including (non-exhaustive):
    • Metrics are made available to external systems [2]
  • Kinesis Data Fetcher
    • Hand received messages up to the parent layer
    • Notify the end of a shard/stop consumption
    • A reference to the KinesisDataFetcher is used to:

Record Publisher

The KDS consumption within the ShardConsumer will be abstracted out to RecordPublisher exposing the following interface. A record publisher will start publishing records from the provided sequence number. Kinesis Client Library (KCL) took a similar approach [3] when upgrading to AWS SDK v2.x and added support for EFO:

public interface RecordPublisher {
    void subscribe(Subscriber<Record> consumer);
    void run(SequenceNumber startingSequenceNum);

The ShardConsumer interactions with the RecordPublisher are shown below. The last read sequence number is captured during message collection. This follows the behaviour of the existing implementation. The RecordPublisher run method is called in a loop until the stream is exhausted, or the fetcher is signalled to stop.

Two RecordPublisher implementations will be created:

  • PollingRecordPublisher: Move the existing logic from into a new class conforming to the new interface

  • FanOutRecordPublisher: Create an EFO implementation to use SubscribeToShard and consume SubscribeToShardEvents

The RecordPublisher subscriber will handle all common logic in the ShardConsumer, as shown in the flow chart below:

Record Publisher Factory

The RecordPublisher will be instantiated from the KinesisDataFetcher and passed into the ShardConsumer, based on the configuration defined by the customer. Object creation will be deferred to a factory following the Abstract Factory pattern for the domain specific details. The factory will comply to the following interface:

public interface RecordPublisherFactory {
    RecordPublisher buildRecordPublisher(Properties properties, ShardMetricReporter shardMetricReporter);

Two RecordPublisherFactory implementations will be created:

  • PollingRecordPublisherFactory
  • FanOutRecordPublisherFactory

Kinesis Proxy

KinesisProxy is a wrapper class used to invoke AWS Kinesis services with error handling, retry and jitter backoff. The current proxy is based on AWS SDK v1.x and will be renamed to PollingKinesisProxy. A new FanOutKinesisProxy will be created based on AWS SDK v2.x to handle calls to AWS Kinesis when running in Fan Out mode. The appropriate KinesisProxy will be instantiated by the RecordPublisherFactory based on the configuration supplied by the customer application.

Ingestion Semantics

Watermarking, read sequence management and shard discovery are all managed by the KinesisDataFetcher. The proposed solution modifies the ShardConsumer layer which does not impact the consumption semantics, there is a good separation of concern. The current implementation passes messages one by one using KinesisDataFetcher::emitRecordAndUpdateState which creates a Flink watermark and subsequently enqueues the deserialised messages. This logic will remain unchanged and will be reused by the Fan Out Publisher.

Back Pressure

The KinesisDataFetcher applies back pressure when passing messages on to the Flink application. There is a variation in how this is achieved based on which record emitter is used (determined by watermark configuration). Both options are transparent to the RecordPublisher:

  • AsyncKinesisRecordEmitter
    • emitQueue is blocking when at capacity, providing back pressure
    • collect() applies back pressure
  • SyncKinesisRecordEmitter
    • emitQueue is bypassed
    • collect() applies back pressure

The SubscribeToShard connection uses a reactive stream subscription [4] within the AWS SDK v2.x library. This supports flow control back pressure; the publisher will not send any new messages until the demand is signalled via the client. Therefore blocking in the receive callback will cause back pressure.

However, the SDK client has a default read timeout of 10 seconds. Any back pressure larger than 10 seconds will result in a ReadTimeout. This will be treated as a recoverable error, and the producer will resubscribe to the shard at the point in which the ReadTimeout occurred. KCL operates in the same fashion.

Sequence Diagrams

Tear Down

De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.

Error Handling

EFO error handling will mimic behaviour of current polling implementation. All external AWS API calls will have configurable retry policy and jitter backoff with sensible defaults based on API quotas. The user can override these default values in the properties when creating the FlinkKinesisConsumer. Non recoverable errors will result in the application terminating with a sensible error message, as per current behaviour. KCL will be used as a reference to determine which errors are recoverable/fatal.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    • None. EFO will be an opt-in feature. Existing behaviour will remain unchanged. State will be stored using the existing mechanisms and therefore allow users to switch EFO on and off without requiring any migration.
  • If we are changing behaviour how will we phase out the older behaviour?
    • New behaviour will be optional. User application will determine which behaviour to use based on configuration supplied to the consumer.
  • If we need special migration tools, describe them here.
    • N/A
  • When will we remove the existing behaviour?
    • N/A

Test Plan

Flink has an existing set of end to end tests that run against mocked Kinesis Proxy instances (FakeKinesisBehavioursFactory). These tests will continue to pass, verifying the existing functionality has not been modified. Equivalent Kinesis EFO mocks will be implemented to exercise the existing behaviours against the EFO consumption mechanism (where applicable). Additional tests will be implemented in the same fashion to address differences with EFO, not covered by the existing test suite.

Contribution Plan

The following phased approach will be taken to deliver EFO support:

  • [Documentation] FLIP created, reviewed and approved
  • [Jira sub task/Pull Request] Improved test coverage for existing Polling consumption implementation 
  • [Jira sub task/Pull Request] Refactor existing code support RecordPublisher. A single PollingRecordPubisher will be included. Behaviour of consumer will be unchanged. Include relevant documentation.
  • [Jira sub task/Pull Request] Add AWS SDK v2.x dependency along with FanOutKinesisProxy
  • [Jira sub task/Pull Request] Configuration validation and deserialisation for Fan Out consumers
  • [Jira sub task/Pull Request] Stream consumer registration and de-registration strategies
  • [Jira sub task/Pull Request] Add FanOutRecordPublisher to add EFO support. Include relevant documentation and end to end tests
  • [Sample Applications] Publish sample applications and additional documentation.

Rejected Alternatives

Migrate from AWS SDK v1.x to v2.x

EFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke SubscribeToShard. The existing FlinkKinesisConsumer implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side [5]. It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and DynamoDBStreamConsumer not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.

Waiting for the refactored Source Interfaces

The Flink community are currently refactoring the source connector interface [6] to allow separation of data consumption and shard management (along with other things). AWS are targeting the current source interfaces, meaning additional work may be required later to support the new FlinkKinesisConsumer. Waiting for the updated interfaces was considered but rejected. It is anticipated that the upgrade path for existing customers will be non-trivial, extra work will be required to migrate snapshots and state. Considering the EFO support work will be encapsulated within the consumption responsibility of the consumer, supporting both interfaces is expected to be straightforward.

Random Default Consumer Names

EFO configuration proposes the user specifies the consumer name. This could result in multiple Flink applications competing for a consumer subscription, when not set appropriately (duplicated) and consuming from the same stream. It was considered to generate a random consumer name for use with a Flink job, however this was rejected. Terminating Flink applications without properly executing the teardown method could result in orphaned consumers. A customer could then encounter additional cost, or approach the 20 consumer limit as a result.

Using Kinesis Client Library (KCL)

KCL [7] was considered to replace the existing consumer but rejected. KCL does not support the use case for the following reasons:

  • FlinkKinesisConsumer supports multiple streams in a single instance
  • FlinkKinesisConsumer runs in a distributed fashion with low level control of shard state to achieve “exactly once” semantics

Appendix A - Current State

This section describes how the current FlinkKinesisConsumer is implemented to give context around how and where the changes need to be applied. The FlinkKinesisConsumer is created and configured by the end user application. Everything within the FlinkKinesisConsumer is managed by Flink transparently to the user.

The FlinkKinesisConsumer currently reads data from KDS using a polling mechanism from the following API endpoints:

  • GetShardIterator [8]
  • GetRecords [9]

The block diagram below shows the main components in the FlinkKinesisConsumer:

  • FlinkKinesisConsumer: The top level source connector instantiated by the end user application
  • KinesisDataFetcher: A single thread managing the Shard Consumers. This thread polls ListShards to determine when shards are splitting and merging and spawn new ShardConsumers as required
  • ShardConsumer: A single thread continuously reading from a shard using GetShardIterator and GetRecords

Each ShardConsumer populates a queue which passes messages back to the end application via the KinesisDataFetcher.

Appendix B - Example EFO Consumer Application Flow

An EFO consumer would require the use of the following API endpoints:

  • RegisterStreamConsumer [10]
  • ListStreamConsumers [11]
  • SubscribeToShard [12]
  • DeregisterStreamConsumer [13]

An example EFO consumer flow is illustrated below (it is assumed the application already knows the stream and shard details):

  • Register Stream Consumer
    • A consumer must register with a KDS to enable EFO
    • The consumer must wait for the registration to complete before attempting to SubscribeToShard. This can be achieved by polling the ListStreamConsumers endpoint
    • If the consumer is already registered an error is thrown
  • Subscribe to Shard
    • Invoking SubscribeToShard will establish a HTTP/2 connection to the shard
    • Once subscribed, the consumer will receive events containing data from KDS
    • A subscription lasts up to 5 minutes, once the subscription expires the consumer must resubscribe to continue receiving data
  • Subscribe to Shard Event
    • Once subscribed, the consumer will receive events containing data from KDS
    • Data should be deserialised and forwarded on to be processed by the application
    • The event contains a list of records and metadata describing the sequence number and milliseconds behind latest
  • Deregister Stream Consumer
    • Unnecessary additional cost incurred to the customer
    • One of the 20 consumer slots held, potentially preventing other consumers registering
    • When the application is finished and tearing down, the consumer should invoke DeregisterStreamConsumer to delete the EFO registration
    • Leaving a consumer registered will result in:


[1] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
[3] https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java
[4] https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html
[5] https://docs.aws.amazon.com/sdk-for-java/v2/migration-guide/side-by-side.html
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
[7] https://github.com/awslabs/amazon-kinesis-client
[8] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
[9] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
[10] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
[11] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
[12] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
[13] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html