|
Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost [1]. EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.
There will be no changes to public interfaces (only @Internal
classes will be touched)
EFO will be configured using the existing configuration mechanism. The Flink job will supply additional properties to the FlinkKinesisConsumer
that will result in it using a different consumption mechanism internally.
For a description of the current state see Appendix A - Current State
For an example EFO consumer flow see Appendix B - Example EFO Consumer Application Flow
The FlinkKinesisConsumer
will be updated to add support for EFO. There are four distinct changes that are required:
Support will be retained for the existing polling mechanism. EFO will be introduced as an opt-in feature for the existing connector. The default behaviour will use the existing Polling mechanism, a customer must configure the connector to use EFO.
Reasons a customer might not want to use EFO:
The FlinkKinesisConsumer
is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:
Key | Description | Type | Default |
flink.stream.recordpublisher | Select RecordPublisher mechanism (efo|polling) | string | polling |
flink.stream.efo.consumername | The name of the consumer to register with KDS | string | *see below |
flink.stream.efo.registration | Determines how and when consumer de-/registration is performed (lazy|eager|none) | string | lazy |
flink.stream.efo.consumerarn.<stream-name> | The Consumer ARN for a given stream name | string | |
flink.stream.registerstreamconsumer.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.registerstreamconsumer.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.registerstreamconsumer.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.registerstreamconsumer.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.stream.deregisterstreamconsumer.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.deregisterstreamconsumer.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.deregisterstreamconsumer.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.deregisterstreamconsumer.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.stream.liststreamconsumers.maxretries | Maximum number of attempts after recoverable exception | int | 10 |
flink.stream.liststreamconsumers.backoff.base | The base backoff time between attempts | long | 200 |
flink.stream.liststreamconsumers.backoff.max | The maximum backoff time between attempts | long | 1000 |
flink.stream.liststreamconsumers.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
flink.shard.subscribetoshard.maxretries | Maximum number of attempts after recoverable exception | int | 5 |
flink.shard.subscribetoshard.backoff.base | The base backoff time between attempts | long | 1000 |
flink.shard.subscribetoshard.backoff.max | The maximum backoff time between attempts | long | 2000 |
flink.shard.subscribetoshard.backoff.expconst | The power constant for exponential backoff between attempts | double | 1.5 |
* No default. Provided by customer application
A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling SubscribeToShard
will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.
Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:
Properties will be validated, and de-/registration will be performed based on the supplied configuration:
flink.stream.efo.registrar | flink.stream.efo.consumerarn.<stream-name> | Registration | Deregistration |
lazy | Not required | task startup | task tear-down |
eager | Not required | FlinkKinesisConsumer constructor | task tear-down |
none | Required | none | none |
The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.
Stream consumer registration can be invoked:
RegisterStreamConsumer
will be invoked from tasks running on Task Manager slots during startup. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to register the consumer. The first task to invoke the service will win, therefore tasks will poll ListStreamConsumers
and attempt to register when not found. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.FlinkKinesisConsumer
constructor from a single threaded environment. The Consumer ARN(s) will be passed into the tasks via the connector configuration.Most of the existing functionality within the KinesisDataFetcher
and ShardConsumer
can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.
In the current implementation a ShardConsumer
is responsible for pulling data from KDS and passing it onto the Flink application, via the KinesisDataFetcher
. A shard consumer is comprised of the following components:
The KDS consumption within the ShardConsumer
will be abstracted out to RecordPublisher
exposing the following interface. A record publisher will start publishing records from the provided sequence number. Kinesis Client Library (KCL) took a similar approach [3] when upgrading to AWS SDK v2.x and added support for EFO:
public interface RecordPublisher { void subscribe(Subscriber<Record> consumer); void run(SequenceNumber startingSequenceNum); } |
The ShardConsumer
interactions with the RecordPublisher
are shown below. The last read sequence number is captured during message collection. This follows the behaviour of the existing implementation. The RecordPublisher
run method is called in a loop until the stream is exhausted, or the fetcher is signalled to stop.
Two RecordPublisher
implementations will be created:
PollingRecordPublisher:
Move the existing logic from into a new class conforming to the new interfaceFanOutRecordPublisher:
Create an EFO implementation to use SubscribeToShard and consume SubscribeToShardEventsThe RecordPublisher
subscriber will handle all common logic in the ShardConsumer
, as shown in the flow chart below:
The RecordPublisher
will be instantiated from the KinesisDataFetcher
and passed into the ShardConsumer
, based on the configuration defined by the customer. Object creation will be deferred to a factory following the Abstract Factory pattern for the domain specific details. The factory will comply to the following interface:
public interface RecordPublisherFactory { RecordPublisher buildRecordPublisher(Properties properties, ShardMetricReporter shardMetricReporter); } |
Two RecordPublisherFactory
implementations will be created:
PollingRecordPublisherFactory
FanOutRecordPublisherFactory
KinesisProxy
is a wrapper class used to invoke AWS Kinesis services with error handling, retry and jitter backoff. The current proxy is based on AWS SDK v1.x and will be renamed to PollingKinesisProxy
. A new FanOutKinesisProxy
will be created based on AWS SDK v2.x to handle calls to AWS Kinesis when running in Fan Out mode. The appropriate KinesisProxy
will be instantiated by the RecordPublisherFactory
based on the configuration supplied by the customer application.
Watermarking, read sequence management and shard discovery are all managed by the KinesisDataFetcher
. The proposed solution modifies the ShardConsumer
layer which does not impact the consumption semantics, there is a good separation of concern. The current implementation passes messages one by one using KinesisDataFetcher::emitRecordAndUpdateState
which creates a Flink watermark and subsequently enqueues the deserialised messages. This logic will remain unchanged and will be reused by the Fan Out Publisher.
The KinesisDataFetcher
applies back pressure when passing messages on to the Flink application. There is a variation in how this is achieved based on which record emitter is used (determined by watermark configuration). Both options are transparent to the RecordPublisher
:
AsyncKinesisRecordEmitter
SyncKinesisRecordEmitter
The SubscribeToShard
connection uses a reactive stream subscription [4] within the AWS SDK v2.x library. This supports flow control back pressure; the publisher will not send any new messages until the demand is signalled via the client. Therefore blocking in the receive callback will cause back pressure.
However, the SDK client has a default read timeout of 10 seconds. Any back pressure larger than 10 seconds will result in a ReadTimeout
. This will be treated as a recoverable error, and the producer will resubscribe to the shard at the point in which the ReadTimeout
occurred. KCL operates in the same fashion.
De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.
EFO error handling will mimic behaviour of current polling implementation. All external AWS API calls will have configurable retry policy and jitter backoff with sensible defaults based on API quotas. The user can override these default values in the properties when creating the FlinkKinesisConsumer
. Non recoverable errors will result in the application terminating with a sensible error message, as per current behaviour. KCL will be used as a reference to determine which errors are recoverable/fatal.
Flink has an existing set of end to end tests that run against mocked Kinesis Proxy instances (FakeKinesisBehavioursFactory
). These tests will continue to pass, verifying the existing functionality has not been modified. Equivalent Kinesis EFO mocks will be implemented to exercise the existing behaviours against the EFO consumption mechanism (where applicable). Additional tests will be implemented in the same fashion to address differences with EFO, not covered by the existing test suite.
The following phased approach will be taken to deliver EFO support:
RecordPublisher
. A single PollingRecordPubisher
will be included. Behaviour of consumer will be unchanged. Include relevant documentation.FanOutRecordPublisher
to add EFO support. Include relevant documentation and end to end testsEFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke SubscribeToShard
. The existing FlinkKinesisConsumer
implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side [5]. It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and DynamoDBStreamConsumer
not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.
The Flink community are currently refactoring the source connector interface [6] to allow separation of data consumption and shard management (along with other things). AWS are targeting the current source interfaces, meaning additional work may be required later to support the new FlinkKinesisConsumer
. Waiting for the updated interfaces was considered but rejected. It is anticipated that the upgrade path for existing customers will be non-trivial, extra work will be required to migrate snapshots and state. Considering the EFO support work will be encapsulated within the consumption responsibility of the consumer, supporting both interfaces is expected to be straightforward.
EFO configuration proposes the user specifies the consumer name. This could result in multiple Flink applications competing for a consumer subscription, when not set appropriately (duplicated) and consuming from the same stream. It was considered to generate a random consumer name for use with a Flink job, however this was rejected. Terminating Flink applications without properly executing the teardown method could result in orphaned consumers. A customer could then encounter additional cost, or approach the 20 consumer limit as a result.
KCL [7] was considered to replace the existing consumer but rejected. KCL does not support the use case for the following reasons:
FlinkKinesisConsumer
supports multiple streams in a single instanceFlinkKinesisConsumer
runs in a distributed fashion with low level control of shard state to achieve “exactly once” semanticsThis section describes how the current FlinkKinesisConsumer
is implemented to give context around how and where the changes need to be applied. The FlinkKinesisConsumer
is created and configured by the end user application. Everything within the FlinkKinesisConsumer
is managed by Flink transparently to the user.
The FlinkKinesisConsumer
currently reads data from KDS using a polling mechanism from the following API endpoints:
The block diagram below shows the main components in the FlinkKinesisConsumer
:
FlinkKinesisConsumer
: The top level source connector instantiated by the end user applicationKinesisDataFetcher
: A single thread managing the Shard Consumers. This thread polls ListShards
to determine when shards are splitting and merging and spawn new ShardConsumers
as requiredShardConsumer
: A single thread continuously reading from a shard using GetShardIterator
and GetRecords
Each ShardConsumer
populates a queue which passes messages back to the end application via the KinesisDataFetcher
.
An EFO consumer would require the use of the following API endpoints:
RegisterStreamConsumer
[10]ListStreamConsumers
[11]SubscribeToShard
[12]DeregisterStreamConsumer
[13]An example EFO consumer flow is illustrated below (it is assumed the application already knows the stream and shard details):
SubscribeToShard
. This can be achieved by polling the ListStreamConsumers
endpointSubscribeToShard
will establish a HTTP/2 connection to the shardDeregisterStreamConsumer
to delete the EFO registration[1] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
[3] https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java
[4] https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html
[5] https://docs.aws.amazon.com/sdk-for-java/v2/migration-guide/side-by-side.html
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
[7] https://github.com/awslabs/amazon-kinesis-client
[8] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
[9] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
[10] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
[11] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
[12] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
[13] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html