Motivation
Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost [1]. EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.
Public Interfaces
There will be no changes to public interfaces (only @Internal
classes will be touched)
EFO will be configured using the existing configuration mechanism. The Flink job will supply additional properties to the FlinkKinesisConsumer
that will result in it using a different consumption mechanism internally.
Proposed Changes
For a description of the current state see Appendix A - Current State
For an example EFO consumer flow see Appendix B - Example EFO Consumer Application Flow
The FlinkKinesisConsumer
will be updated to add support for EFO. There are four distinct changes that are required:
- Configuration
- Stream Consumer Registration
- Ingestion
- Tear Down
Configuration
Support will be retained for the existing polling mechanism. EFO will be introduced as an opt-in feature for the existing connector. The default behaviour will use the existing Polling mechanism, a customer must configure the connector to use EFO.
Reasons a customer might not want to use EFO:
- EFO results in additional cost per hour
- EFO has certain limitations, for example a maximum of 20 registered consumers per stream
- Streams with a single consumer would not benefit from the dedicated throughput (they already have the full quota)
The FlinkKinesisConsumer
is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:
Key | Description | Type | Default |
flink.stream.recordpublisher | Select RecordPublisher mechanism (efo|polling) | string | polling |
flink.stream.efo.consumername | The name of the consumer to register with KDS | string | *see below |
flink.stream.efo.registration | Determines how and when consumer de-/registration is performed (lazy|eager|none) | string | lazy |
flink.stream.efo.consumerarn.<stream-name> | The Consumer ARN for a given stream name | string | |
flink.stream.registerstreamconsumer.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.registerstreamconsumer.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.registerstreamconsumer.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.registerstreamconsumer.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.stream.deregisterstreamconsumer.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.deregisterstreamconsumer.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.deregisterstreamconsumer.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.deregisterstreamconsumer.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.stream.liststreamconsumers.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.liststreamconsumers.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.liststreamconsumers.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.liststreamconsumers.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.shard.subscribetoshard.maxretries | Maximum number of attempts after recoverable exception | int | 5 |
flink.shard.subscribetoshard.backoff.base | The base backoff time between attempts | long | 1000 |
flink.shard.subscribetoshard.backoff.max | The maximum backoff time between attempts | long | 2000 |
flink.shard.subscribetoshard.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
* No default. Provided by customer application
Default Consumer Name
A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling SubscribeToShard
will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.
Registration/De-registration Configuration
Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:
- Registration
- Lazy: Performed during task startup (default option)
- Pro: Registration is performed from within the tasks running on Flink Task Managers during startup
- Con: Parallel tasks competing to register may result in an increased start-up time
- Eager: Performed when the FlinkKinesisConsumer is constructed during application main method
- Pro: Registration is performed once per Flink job
- Con: Client execution environment may not have access to external resources
- None: Performed externally by the user from the AWS CLI (user supplies consumer ARN to connector)
- Pro: Application start-up time will be reduced
- Con: Additional configuration to supply and up-front setup to be performed by customer
- De-registration
- Lazy/Eager: Performed during task tear-down (default option)
- Pro: De-registration is performed from within the Flink cluster
- Con: Parallel tasks competing to de-register may result in an increased tear-down time
- None: Performed externally by the user (user supplies consumer ARN to connector)
- Pro: Application tear-down time will be reduced
- Con: Consumer is still registered once the job ends and user would need to manually de-register to avoid additional cost
Properties will be validated, and de-/registration will be performed based on the supplied configuration:
flink.stream.efo.registrar | flink.stream.efo.consumerarn.<stream-name> | Registration | Deregistration |
lazy | Not required | task startup | task tear-down |
eager | Not required | FlinkKinesisConsumer constructor | task tear-down |
none | Required | none | none |
Stream Consumer Registration
The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.
Stream consumer registration can be invoked:
- Lazy: During task start-up, in a multi threaded/process environment.
RegisterStreamConsumer
will be invoked from tasks running on Task Manager slots during startup. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to register the consumer. The first task to invoke the service will win, therefore tasks will pollListStreamConsumers
and attempt to register when not found. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer. - Eager: Within the
FlinkKinesisConsumer
constructor from a single threaded environment. The Consumer ARN(s) will be passed into the tasks via the connector configuration. - None: The user application will supply the Consumer ARN(s) with the connector properties when registration is disabled.
Ingestion
Most of the existing functionality within the KinesisDataFetcher
and ShardConsumer
can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.
In the current implementation a ShardConsumer
is responsible for pulling data from KDS and passing it onto the Flink application, via the KinesisDataFetcher
. A shard consumer is comprised of the following components:
- Kinesis Proxy
- A wrapper to proxy calls to AWS services
- Handles errors and retry with backoff
- Shard Metrics Reporter
- millisBehindLatest
- bytesPerRead
- numberOfAggregatedRecords
- A class used to report arbitrary metrics including (non-exhaustive):
- Metrics are made available to external systems [2]
- Kinesis Data Fetcher
- Hand received messages up to the parent layer
- Notify the end of a shard/stop consumption
- A reference to the KinesisDataFetcher is used to:
Record Publisher
The KDS consumption within the ShardConsumer
will be abstracted out to RecordPublisher
exposing the following interface. A record publisher will start publishing records from the provided sequence number. Kinesis Client Library (KCL) took a similar approach [3] when upgrading to AWS SDK v2.x and added support for EFO:
public interface RecordPublisher { void subscribe(Subscriber<Record> consumer); void run(SequenceNumber startingSequenceNum); }
The ShardConsumer
interactions with the RecordPublisher
are shown below. The last read sequence number is captured during message collection. This follows the behaviour of the existing implementation. The RecordPublisher
run method is called in a loop until the stream is exhausted, or the fetcher is signalled to stop.
Two RecordPublisher
implementations will be created:
PollingRecordPublisher:
Move the existing logic from into a new class conforming to the new interface
FanOutRecordPublisher:
Create an EFO implementation to use SubscribeToShard and consume SubscribeToShardEvents
The RecordPublisher
subscriber will handle all common logic in the ShardConsumer
, as shown in the flow chart below:
Record Publisher Factory
The RecordPublisher
will be instantiated from the KinesisDataFetcher
and passed into the ShardConsumer
, based on the configuration defined by the customer. Object creation will be deferred to a factory following the Abstract Factory pattern for the domain specific details. The factory will comply to the following interface:
public interface RecordPublisherFactory { RecordPublisher buildRecordPublisher(Properties properties, ShardMetricReporter shardMetricReporter); }
Two RecordPublisherFactory
implementations will be created:
PollingRecordPublisherFactory
FanOutRecordPublisherFactory
Kinesis Proxy
KinesisProxy
is a wrapper class used to invoke AWS Kinesis services with error handling, retry and jitter backoff. The current proxy is based on AWS SDK v1.x and will be renamed to PollingKinesisProxy
. A new FanOutKinesisProxy
will be created based on AWS SDK v2.x to handle calls to AWS Kinesis when running in Fan Out mode. The appropriate KinesisProxy
will be instantiated by the RecordPublisherFactory
based on the configuration supplied by the customer application.
Ingestion Semantics
Watermarking, read sequence management and shard discovery are all managed by the KinesisDataFetcher
. The proposed solution modifies the ShardConsumer
layer which does not impact the consumption semantics, there is a good separation of concern. The current implementation passes messages one by one using KinesisDataFetcher::emitRecordAndUpdateState
which creates a Flink watermark and subsequently enqueues the deserialised messages. This logic will remain unchanged and will be reused by the Fan Out Publisher.
Back Pressure
The KinesisDataFetcher
applies back pressure when passing messages on to the Flink application. There is a variation in how this is achieved based on which record emitter is used (determined by watermark configuration). Both options are transparent to the RecordPublisher
:
AsyncKinesisRecordEmitter
- emitQueue is blocking when at capacity, providing back pressure
- collect() applies back pressure
SyncKinesisRecordEmitter
- emitQueue is bypassed
- collect() applies back pressure
The SubscribeToShard
connection uses a reactive stream subscription [4] within the AWS SDK v2.x library. This supports flow control back pressure; the publisher will not send any new messages until the demand is signalled via the client. Therefore blocking in the receive callback will cause back pressure.
However, the SDK client has a default read timeout of 10 seconds. Any back pressure larger than 10 seconds will result in a ReadTimeout
. This will be treated as a recoverable error, and the producer will resubscribe to the shard at the point in which the ReadTimeout
occurred. KCL operates in the same fashion.
Sequence Diagrams
Tear Down
De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.
Error Handling
EFO error handling will mimic behaviour of current polling implementation. All external AWS API calls will have configurable retry policy and jitter backoff with sensible defaults based on API quotas. The user can override these default values in the properties when creating the FlinkKinesisConsumer
. Non recoverable errors will result in the application terminating with a sensible error message, as per current behaviour. KCL will be used as a reference to determine which errors are recoverable/fatal.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- None. EFO will be an opt-in feature. Existing behaviour will remain unchanged. State will be stored using the existing mechanisms and therefore allow users to switch EFO on and off without requiring any migration.
- If we are changing behaviour how will we phase out the older behaviour?
- New behaviour will be optional. User application will determine which behaviour to use based on configuration supplied to the consumer.
- If we need special migration tools, describe them here.
- N/A
- When will we remove the existing behaviour?
- N/A
Test Plan
Flink has an existing set of end to end tests that run against mocked Kinesis Proxy instances (FakeKinesisBehavioursFactory
). These tests will continue to pass, verifying the existing functionality has not been modified. Equivalent Kinesis EFO mocks will be implemented to exercise the existing behaviours against the EFO consumption mechanism (where applicable). Additional tests will be implemented in the same fashion to address differences with EFO, not covered by the existing test suite.
Contribution Plan
The following phased approach will be taken to deliver EFO support:
- [Documentation] FLIP created, reviewed and approved
- [Jira sub task/Pull Request] Improved test coverage for existing Polling consumption implementation
- [Jira sub task/Pull Request] Refactor existing code support
RecordPublisher
. A singlePollingRecordPubisher
will be included. Behaviour of consumer will be unchanged. Include relevant documentation. - [Jira sub task/Pull Request] Add AWS SDK v2.x dependency along with FanOutKinesisProxy
- [Jira sub task/Pull Request] Configuration validation and deserialisation for Fan Out consumers
- [Jira sub task/Pull Request] Stream consumer registration and de-registration strategies
- [Jira sub task/Pull Request] Add
FanOutRecordPublisher
to add EFO support. Include relevant documentation and end to end tests - [Sample Applications] Publish sample applications and additional documentation.
Rejected Alternatives
Migrate from AWS SDK v1.x to v2.x
EFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke SubscribeToShard
. The existing FlinkKinesisConsumer
implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side [5]. It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and DynamoDBStreamConsumer
not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.
Waiting for the refactored Source Interfaces
The Flink community are currently refactoring the source connector interface [6] to allow separation of data consumption and shard management (along with other things). AWS are targeting the current source interfaces, meaning additional work may be required later to support the new FlinkKinesisConsumer
. Waiting for the updated interfaces was considered but rejected. It is anticipated that the upgrade path for existing customers will be non-trivial, extra work will be required to migrate snapshots and state. Considering the EFO support work will be encapsulated within the consumption responsibility of the consumer, supporting both interfaces is expected to be straightforward.
Random Default Consumer Names
EFO configuration proposes the user specifies the consumer name. This could result in multiple Flink applications competing for a consumer subscription, when not set appropriately (duplicated) and consuming from the same stream. It was considered to generate a random consumer name for use with a Flink job, however this was rejected. Terminating Flink applications without properly executing the teardown method could result in orphaned consumers. A customer could then encounter additional cost, or approach the 20 consumer limit as a result.
Using Kinesis Client Library (KCL)
KCL [7] was considered to replace the existing consumer but rejected. KCL does not support the use case for the following reasons:
FlinkKinesisConsumer
supports multiple streams in a single instanceFlinkKinesisConsumer
runs in a distributed fashion with low level control of shard state to achieve “exactly once” semantics
Appendix A - Current State
This section describes how the current FlinkKinesisConsumer
is implemented to give context around how and where the changes need to be applied. The FlinkKinesisConsumer
is created and configured by the end user application. Everything within the FlinkKinesisConsumer
is managed by Flink transparently to the user.
The FlinkKinesisConsumer
currently reads data from KDS using a polling mechanism from the following API endpoints:
The block diagram below shows the main components in the FlinkKinesisConsumer
:
FlinkKinesisConsumer
: The top level source connector instantiated by the end user applicationKinesisDataFetcher
: A single thread managing the Shard Consumers. This thread pollsListShards
to determine when shards are splitting and merging and spawn newShardConsumers
as requiredShardConsumer
: A single thread continuously reading from a shard usingGetShardIterator
andGetRecords
Each ShardConsumer
populates a queue which passes messages back to the end application via the KinesisDataFetcher
.
Appendix B - Example EFO Consumer Application Flow
An EFO consumer would require the use of the following API endpoints:
RegisterStreamConsumer
[10]ListStreamConsumers
[11]SubscribeToShard
[12]DeregisterStreamConsumer
[13]
An example EFO consumer flow is illustrated below (it is assumed the application already knows the stream and shard details):
- Register Stream Consumer
- A consumer must register with a KDS to enable EFO
- The consumer must wait for the registration to complete before attempting to
SubscribeToShard
. This can be achieved by polling theListStreamConsumers
endpoint - If the consumer is already registered an error is thrown
- Subscribe to Shard
- Invoking
SubscribeToShard
will establish a HTTP/2 connection to the shard - Once subscribed, the consumer will receive events containing data from KDS
- A subscription lasts up to 5 minutes, once the subscription expires the consumer must resubscribe to continue receiving data
- Subscribe to Shard Event
- Once subscribed, the consumer will receive events containing data from KDS
- Data should be deserialised and forwarded on to be processed by the application
- The event contains a list of records and metadata describing the sequence number and milliseconds behind latest
- Deregister Stream Consumer
- Unnecessary additional cost incurred to the customer
- One of the 20 consumer slots held, potentially preventing other consumers registering
- When the application is finished and tearing down, the consumer should invoke
DeregisterStreamConsumer
to delete the EFO registration - Leaving a consumer registered will result in:
References
[1] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
[3] https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java
[4] https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html
[5] https://docs.aws.amazon.com/sdk-for-java/v2/migration-guide/side-by-side.html
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
[7] https://github.com/awslabs/amazon-kinesis-client
[8] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
[9] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
[10] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
[11] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
[12] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
[13] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html