Current state: Accepted
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-td42720.html
Voting thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-128-Enhanced-Fan-Out-for-AWS-Kinesis-Consumers-td42846.html
JIRA: - FLINK-17688Getting issue details... STATUS
Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost . EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.
There will be no changes to public interfaces (only
@Internal classes will be touched)
EFO will be configured using the existing configuration mechanism. The Flink job will supply additional properties to the
FlinkKinesisConsumer that will result in it using a different consumption mechanism internally.
For a description of the current state see Appendix A - Current State
For an example EFO consumer flow see Appendix B - Example EFO Consumer Application Flow
FlinkKinesisConsumer will be updated to add support for EFO. There are four distinct changes that are required:
- Stream Consumer Registration
- Tear Down
Support will be retained for the existing polling mechanism. EFO will be introduced as an opt-in feature for the existing connector. The default behaviour will use the existing Polling mechanism, a customer must configure the connector to use EFO.
Reasons a customer might not want to use EFO:
- EFO results in additional cost per hour
- EFO has certain limitations, for example a maximum of 20 registered consumers per stream
- Streams with a single consumer would not benefit from the dedicated throughput (they already have the full quota)
FlinkKinesisConsumer is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:
|flink.stream.recordpublisher||Select RecordPublisher mechanism (efo|polling)||string||polling|
|flink.stream.efo.consumername||The name of the consumer to register with KDS||string||*see below|
|flink.stream.efo.registration||Determines how and when consumer de-/registration is performed (lazy|eager|none)||string||lazy|
|flink.stream.efo.consumerarn.<stream-name>||The Consumer ARN for a given stream name||string|
|flink.stream.registerstreamconsumer.maxretries||Maximum number of attempts after recoverable exception||int||10|
|flink.stream.registerstreamconsumer.backoff.base||The base backoff time between attempts||long||200|
|flink.stream.registerstreamconsumer.backoff.max||The maximum backoff time between attempts||long||1000|
|flink.stream.registerstreamconsumer.backoff.expconst||The power constant for exponential backoff between attempts||double||1.5|
|flink.stream.deregisterstreamconsumer.maxretries||Maximum number of attempts after recoverable exception||int||10|
|flink.stream.deregisterstreamconsumer.backoff.base||The base backoff time between attempts||long||200|
|flink.stream.deregisterstreamconsumer.backoff.max||The maximum backoff time between attempts||long||1000|
|flink.stream.deregisterstreamconsumer.backoff.expconst||The power constant for exponential backoff between attempts||double||1.5|
|flink.stream.liststreamconsumers.maxretries||Maximum number of attempts after recoverable exception||int||10|
|flink.stream.liststreamconsumers.backoff.base||The base backoff time between attempts||long||200|
|flink.stream.liststreamconsumers.backoff.max||The maximum backoff time between attempts||long||1000|
|flink.stream.liststreamconsumers.backoff.expconst||The power constant for exponential backoff between attempts||double||1.5|
|flink.shard.subscribetoshard.maxretries||Maximum number of attempts after recoverable exception||int||5|
|flink.shard.subscribetoshard.backoff.base||The base backoff time between attempts||long||1000|
|flink.shard.subscribetoshard.backoff.max||The maximum backoff time between attempts||long||2000|
|flink.shard.subscribetoshard.backoff.expconst||The power constant for exponential backoff between attempts||double||1.5|
* No default. Provided by customer application
Default Consumer Name
A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling
SubscribeToShard will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.
Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:
- Lazy: Performed during task startup (default option)
- Pro: Registration is performed from within the tasks running on Flink Task Managers during startup
- Con: Parallel tasks competing to register may result in an increased start-up time
- Eager: Performed when the FlinkKinesisConsumer is constructed during application main method
- Pro: Registration is performed once per Flink job
- Con: Client execution environment may not have access to external resources
- None: Performed externally by the user from the AWS CLI (user supplies consumer ARN to connector)
- Pro: Application start-up time will be reduced
- Con: Additional configuration to supply and up-front setup to be performed by customer
- Lazy/Eager: Performed during task tear-down (default option)
- Pro: De-registration is performed from within the Flink cluster
- Con: Parallel tasks competing to de-register may result in an increased tear-down time
- None: Performed externally by the user (user supplies consumer ARN to connector)
- Pro: Application tear-down time will be reduced
- Con: Consumer is still registered once the job ends and user would need to manually de-register to avoid additional cost
Properties will be validated, and de-/registration will be performed based on the supplied configuration:
|lazy||Not required||task startup||task tear-down|
|eager||Not required||FlinkKinesisConsumer constructor||task tear-down|
Stream Consumer Registration
The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.
Stream consumer registration can be invoked:
- Lazy: During task start-up, in a multi threaded/process environment.
RegisterStreamConsumerwill be invoked from tasks running on Task Manager slots during startup. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to register the consumer. The first task to invoke the service will win, therefore tasks will poll
ListStreamConsumersand attempt to register when not found. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.
- Eager: Within the
FlinkKinesisConsumerconstructor from a single threaded environment. The Consumer ARN(s) will be passed into the tasks via the connector configuration.
- None: The user application will supply the Consumer ARN(s) with the connector properties when registration is disabled.
Most of the existing functionality within the
ShardConsumer can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.
In the current implementation a
ShardConsumer is responsible for pulling data from KDS and passing it onto the Flink application, via the
KinesisDataFetcher. A shard consumer is comprised of the following components:
- Kinesis Proxy
- A wrapper to proxy calls to AWS services
- Handles errors and retry with backoff
- Shard Metrics Reporter
- A class used to report arbitrary metrics including (non-exhaustive):
- Metrics are made available to external systems 
- Kinesis Data Fetcher
- Hand received messages up to the parent layer
- Notify the end of a shard/stop consumption
- A reference to the KinesisDataFetcher is used to:
The KDS consumption within the
ShardConsumer will be abstracted out to
RecordPublisher exposing the following interface. A record publisher will start publishing records from the provided sequence number. Kinesis Client Library (KCL) took a similar approach  when upgrading to AWS SDK v2.x and added support for EFO:
ShardConsumer interactions with the
RecordPublisher are shown below. The last read sequence number is captured during message collection. This follows the behaviour of the existing implementation. The
RecordPublisher run method is called in a loop until the stream is exhausted, or the fetcher is signalled to stop.
RecordPublisher implementations will be created:
PollingRecordPublisher:Move the existing logic from into a new class conforming to the new interface
FanOutRecordPublisher:Create an EFO implementation to use SubscribeToShard and consume SubscribeToShardEvents
RecordPublisher subscriber will handle all common logic in the
ShardConsumer, as shown in the flow chart below:
Record Publisher Factory
RecordPublisher will be instantiated from the
KinesisDataFetcher and passed into the
ShardConsumer, based on the configuration defined by the customer. Object creation will be deferred to a factory following the Abstract Factory pattern for the domain specific details. The factory will comply to the following interface:
RecordPublisherFactory implementations will be created:
KinesisProxy is a wrapper class used to invoke AWS Kinesis services with error handling, retry and jitter backoff. The current proxy is based on AWS SDK v1.x and will be renamed to
PollingKinesisProxy. A new
FanOutKinesisProxy will be created based on AWS SDK v2.x to handle calls to AWS Kinesis when running in Fan Out mode. The appropriate
KinesisProxy will be instantiated by the
RecordPublisherFactory based on the configuration supplied by the customer application.
Watermarking, read sequence management and shard discovery are all managed by the
KinesisDataFetcher. The proposed solution modifies the
ShardConsumer layer which does not impact the consumption semantics, there is a good separation of concern. The current implementation passes messages one by one using
KinesisDataFetcher::emitRecordAndUpdateState which creates a Flink watermark and subsequently enqueues the deserialised messages. This logic will remain unchanged and will be reused by the Fan Out Publisher.
KinesisDataFetcher applies back pressure when passing messages on to the Flink application. There is a variation in how this is achieved based on which record emitter is used (determined by watermark configuration). Both options are transparent to the
- emitQueue is blocking when at capacity, providing back pressure
- collect() applies back pressure
- emitQueue is bypassed
- collect() applies back pressure
SubscribeToShard connection uses a reactive stream subscription  within the AWS SDK v2.x library. This supports flow control back pressure; the publisher will not send any new messages until the demand is signalled via the client. Therefore blocking in the receive callback will cause back pressure.
However, the SDK client has a default read timeout of 10 seconds. Any back pressure larger than 10 seconds will result in a
ReadTimeout. This will be treated as a recoverable error, and the producer will resubscribe to the shard at the point in which the
ReadTimeout occurred. KCL operates in the same fashion.
De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.
EFO error handling will mimic behaviour of current polling implementation. All external AWS API calls will have configurable retry policy and jitter backoff with sensible defaults based on API quotas. The user can override these default values in the properties when creating the
FlinkKinesisConsumer. Non recoverable errors will result in the application terminating with a sensible error message, as per current behaviour. KCL will be used as a reference to determine which errors are recoverable/fatal.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- None. EFO will be an opt-in feature. Existing behaviour will remain unchanged. State will be stored using the existing mechanisms and therefore allow users to switch EFO on and off without requiring any migration.
- If we are changing behaviour how will we phase out the older behaviour?
- New behaviour will be optional. User application will determine which behaviour to use based on configuration supplied to the consumer.
- If we need special migration tools, describe them here.
- When will we remove the existing behaviour?
Flink has an existing set of end to end tests that run against mocked Kinesis Proxy instances (
FakeKinesisBehavioursFactory). These tests will continue to pass, verifying the existing functionality has not been modified. Equivalent Kinesis EFO mocks will be implemented to exercise the existing behaviours against the EFO consumption mechanism (where applicable). Additional tests will be implemented in the same fashion to address differences with EFO, not covered by the existing test suite.
The following phased approach will be taken to deliver EFO support:
- [Documentation] FLIP created, reviewed and approved
- [Jira sub task/Pull Request] Improved test coverage for existing Polling consumption implementation
- [Jira sub task/Pull Request] Refactor existing code support
RecordPublisher. A single
PollingRecordPubisherwill be included. Behaviour of consumer will be unchanged. Include relevant documentation.
- [Jira sub task/Pull Request] Add AWS SDK v2.x dependency along with FanOutKinesisProxy
- [Jira sub task/Pull Request] Configuration validation and deserialisation for Fan Out consumers
- [Jira sub task/Pull Request] Stream consumer registration and de-registration strategies
- [Jira sub task/Pull Request] Add
FanOutRecordPublisherto add EFO support. Include relevant documentation and end to end tests
- [Sample Applications] Publish sample applications and additional documentation.
Migrate from AWS SDK v1.x to v2.x
EFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke
SubscribeToShard. The existing
FlinkKinesisConsumer implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side . It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and
DynamoDBStreamConsumer not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.
Waiting for the refactored Source Interfaces
The Flink community are currently refactoring the source connector interface  to allow separation of data consumption and shard management (along with other things). AWS are targeting the current source interfaces, meaning additional work may be required later to support the new
FlinkKinesisConsumer. Waiting for the updated interfaces was considered but rejected. It is anticipated that the upgrade path for existing customers will be non-trivial, extra work will be required to migrate snapshots and state. Considering the EFO support work will be encapsulated within the consumption responsibility of the consumer, supporting both interfaces is expected to be straightforward.
Random Default Consumer Names
EFO configuration proposes the user specifies the consumer name. This could result in multiple Flink applications competing for a consumer subscription, when not set appropriately (duplicated) and consuming from the same stream. It was considered to generate a random consumer name for use with a Flink job, however this was rejected. Terminating Flink applications without properly executing the teardown method could result in orphaned consumers. A customer could then encounter additional cost, or approach the 20 consumer limit as a result.
Using Kinesis Client Library (KCL)
KCL  was considered to replace the existing consumer but rejected. KCL does not support the use case for the following reasons:
FlinkKinesisConsumersupports multiple streams in a single instance
FlinkKinesisConsumerruns in a distributed fashion with low level control of shard state to achieve “exactly once” semantics
Appendix A - Current State
This section describes how the current
FlinkKinesisConsumer is implemented to give context around how and where the changes need to be applied. The
FlinkKinesisConsumer is created and configured by the end user application. Everything within the
FlinkKinesisConsumer is managed by Flink transparently to the user.
FlinkKinesisConsumer currently reads data from KDS using a polling mechanism from the following API endpoints:
The block diagram below shows the main components in the
FlinkKinesisConsumer: The top level source connector instantiated by the end user application
KinesisDataFetcher: A single thread managing the Shard Consumers. This thread polls
ListShardsto determine when shards are splitting and merging and spawn new
ShardConsumer: A single thread continuously reading from a shard using
ShardConsumer populates a queue which passes messages back to the end application via the
Appendix B - Example EFO Consumer Application Flow
An EFO consumer would require the use of the following API endpoints:
An example EFO consumer flow is illustrated below (it is assumed the application already knows the stream and shard details):
- Register Stream Consumer
- A consumer must register with a KDS to enable EFO
- The consumer must wait for the registration to complete before attempting to
SubscribeToShard. This can be achieved by polling the
- If the consumer is already registered an error is thrown
- Subscribe to Shard
SubscribeToShardwill establish a HTTP/2 connection to the shard
- Once subscribed, the consumer will receive events containing data from KDS
- A subscription lasts up to 5 minutes, once the subscription expires the consumer must resubscribe to continue receiving data
- Subscribe to Shard Event
- Once subscribed, the consumer will receive events containing data from KDS
- Data should be deserialised and forwarded on to be processed by the application
- The event contains a list of records and metadata describing the sequence number and milliseconds behind latest
- Deregister Stream Consumer
- Unnecessary additional cost incurred to the customer
- One of the 20 consumer slots held, potentially preventing other consumers registering
- When the application is finished and tearing down, the consumer should invoke
DeregisterStreamConsumerto delete the EFO registration
- Leaving a consumer registered will result in: