Discussion thread
Vote thread
JIRA

Release1.12


Motivation

Enhanced Fan Out (EFO) allows AWS Kinesis Data Stream (KDS) consumers to utilise a dedicated read throughput, rather than a shared quota. HTTP/2 reduces latency and typically gives a 65% performance boost [1]. EFO is not currently supported by the Flink Kinesis Consumer. Adding EFO support would allow Flink applications to reap the benefits, widening Flink adoption. Existing applications will be able to optionally perform a backwards compatible library upgrade and configuration tweak to inherit the performance benefits.

Public Interfaces

There will be no changes to public interfaces (only @Internal classes will be touched)

EFO will be configured using the existing configuration mechanism. The Flink job will supply additional properties to the FlinkKinesisConsumer that will result in it using a different consumption mechanism internally.

Proposed Changes

For a description of the current state see Appendix A - Current State

For an example EFO consumer flow see Appendix B - Example EFO Consumer Application Flow

The FlinkKinesisConsumer will be updated to add support for EFO. There are four distinct changes that are required:

  1. Configuration
  2. Stream Consumer Registration
  3. Ingestion
  4. Tear Down

Configuration

Support will be retained for the existing polling mechanism. EFO will be introduced as an opt-in feature for the existing connector. The default behaviour will use the existing Polling mechanism, a customer must configure the connector to use EFO.

Reasons a customer might not want to use EFO:

The FlinkKinesisConsumer is configured via Java Properties allowing arbitrary properties to be supported without breaking existing applications. The properties object is passed to the constructor of the consumer. The following properties will be added to support the two consumption options. Property keys are inline with current Flink naming conventions and will be exposed via constants:

KeyDescriptionTypeDefault
flink.stream.recordpublisherSelect RecordPublisher mechanism (efo|polling)stringpolling
flink.stream.efo.consumernameThe name of the consumer to register with KDSstring*see below
flink.stream.efo.registrationDetermines how and when consumer de-/registration is performed (lazy|eager|none)stringlazy
flink.stream.efo.consumerarn.<stream-name>The Consumer ARN for a given stream namestring
flink.stream.registerstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.registerstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.registerstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.registerstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.deregisterstreamconsumer.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.deregisterstreamconsumer.backoff.baseThe base backoff time between attemptslong200
flink.stream.deregisterstreamconsumer.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.deregisterstreamconsumer.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.stream.liststreamconsumers.maxretriesMaximum number of attempts after recoverable exceptionint10
flink.stream.liststreamconsumers.backoff.baseThe base backoff time between attemptslong200
flink.stream.liststreamconsumers.backoff.maxThe maximum backoff time between attemptslong1000
flink.stream.liststreamconsumers.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5
flink.shard.subscribetoshard.maxretriesMaximum number of attempts after recoverable exceptionint5
flink.shard.subscribetoshard.backoff.baseThe base backoff time between attemptslong1000
flink.shard.subscribetoshard.backoff.maxThe maximum backoff time between attemptslong2000
flink.shard.subscribetoshard.backoff.expconstThe power constant for exponential backoff between attemptsdouble1.5

 * No default. Provided by customer application

Default Consumer Name

A consumer name must be provided by the user application within the configuration property map. The presence of this value will be validated when EFO consumption is enabled. The consumer name should be unique for each independent consumer of a given stream; reusing an active consumer when calling SubscribeToShard will result in active subscriptions being terminated. Therefore the customer would need to set this configuration carefully should they require multiple Flink EFO consumers (applications), reading from the same stream.

Registration/De-registration Configuration

Flink does not yet provide a single entry and exit point in which to perform consumer registration and de-registration. This leaves the following strategies, all of which will be offered via different configuration options:

Properties will be validated, and de-/registration will be performed based on the supplied configuration:

flink.stream.efo.registrarflink.stream.efo.consumerarn.<stream-name>RegistrationDeregistration
lazyNot requiredtask startuptask tear-down
eagerNot requiredFlinkKinesisConsumer constructortask tear-down
noneRequirednonenone

Stream Consumer Registration

The user will select their registration strategy based on the application they are deploying. Generally speaking, applications with high levels of parallelism would benefit from an eager/none registration to reduce quota contention and application startup/tear-down time.

Stream consumer registration can be invoked:

Ingestion

Most of the existing functionality within the KinesisDataFetcher and ShardConsumer can be reused. The code that consumes records from KDS will be abstracted out to support EFO in addition to the existing Polling mechanism.

In the current implementation a ShardConsumer is responsible for pulling data from KDS and passing it onto the Flink application, via the KinesisDataFetcher. A shard consumer is comprised of the following components:

Record Publisher

The KDS consumption within the ShardConsumer will be abstracted out to RecordPublisher exposing the following interface. A record publisher will start publishing records from the provided sequence number. Kinesis Client Library (KCL) took a similar approach [3] when upgrading to AWS SDK v2.x and added support for EFO:

public interface RecordPublisher {
    void subscribe(Subscriber<Record> consumer);
    void run(SequenceNumber startingSequenceNum);
}


The ShardConsumer interactions with the RecordPublisher are shown below. The last read sequence number is captured during message collection. This follows the behaviour of the existing implementation. The RecordPublisher run method is called in a loop until the stream is exhausted, or the fetcher is signalled to stop.



Two RecordPublisher implementations will be created:





The RecordPublisher subscriber will handle all common logic in the ShardConsumer, as shown in the flow chart below:


Record Publisher Factory

The RecordPublisher will be instantiated from the KinesisDataFetcher and passed into the ShardConsumer, based on the configuration defined by the customer. Object creation will be deferred to a factory following the Abstract Factory pattern for the domain specific details. The factory will comply to the following interface:

public interface RecordPublisherFactory {
    RecordPublisher buildRecordPublisher(Properties properties, ShardMetricReporter shardMetricReporter);
}


Two RecordPublisherFactory implementations will be created:

Kinesis Proxy

KinesisProxy is a wrapper class used to invoke AWS Kinesis services with error handling, retry and jitter backoff. The current proxy is based on AWS SDK v1.x and will be renamed to PollingKinesisProxy. A new FanOutKinesisProxy will be created based on AWS SDK v2.x to handle calls to AWS Kinesis when running in Fan Out mode. The appropriate KinesisProxy will be instantiated by the RecordPublisherFactory based on the configuration supplied by the customer application.

Ingestion Semantics

Watermarking, read sequence management and shard discovery are all managed by the KinesisDataFetcher. The proposed solution modifies the ShardConsumer layer which does not impact the consumption semantics, there is a good separation of concern. The current implementation passes messages one by one using KinesisDataFetcher::emitRecordAndUpdateState which creates a Flink watermark and subsequently enqueues the deserialised messages. This logic will remain unchanged and will be reused by the Fan Out Publisher.

Back Pressure

The KinesisDataFetcher applies back pressure when passing messages on to the Flink application. There is a variation in how this is achieved based on which record emitter is used (determined by watermark configuration). Both options are transparent to the RecordPublisher:

The SubscribeToShard connection uses a reactive stream subscription [4] within the AWS SDK v2.x library. This supports flow control back pressure; the publisher will not send any new messages until the demand is signalled via the client. Therefore blocking in the receive callback will cause back pressure.

However, the SDK client has a default read timeout of 10 seconds. Any back pressure larger than 10 seconds will result in a ReadTimeout. This will be treated as a recoverable error, and the producer will resubscribe to the shard at the point in which the ReadTimeout occurred. KCL operates in the same fashion.

Sequence Diagrams



Tear Down

De-registration is enabled by default or when setting registration to lazy or eager within the connector properties. DeregisterStreamConsumer will be invoked from tasks running on Task Manager slots during tear down. Due to the parallel nature of Flink jobs, there will be n distributed threads competing to deregister the consumer. The first task to invoke the service will win, therefore calls will poll ListStreamConsumers and attempt to deregister when the consumer is active. Invocations will be staggered for the backoff time to reduce the number of failures due competing requests. The backoff time will be calculated using the configuration values specified when creating the consumer.

Error Handling

EFO error handling will mimic behaviour of current polling implementation. All external AWS API calls will have configurable retry policy and jitter backoff with sensible defaults based on API quotas. The user can override these default values in the properties when creating the FlinkKinesisConsumer. Non recoverable errors will result in the application terminating with a sensible error message, as per current behaviour. KCL will be used as a reference to determine which errors are recoverable/fatal.

Compatibility, Deprecation, and Migration Plan

Test Plan

Flink has an existing set of end to end tests that run against mocked Kinesis Proxy instances (FakeKinesisBehavioursFactory). These tests will continue to pass, verifying the existing functionality has not been modified. Equivalent Kinesis EFO mocks will be implemented to exercise the existing behaviours against the EFO consumption mechanism (where applicable). Additional tests will be implemented in the same fashion to address differences with EFO, not covered by the existing test suite.

Contribution Plan

The following phased approach will be taken to deliver EFO support:

Rejected Alternatives

Migrate from AWS SDK v1.x to v2.x

EFO requires AWS SDK v2.x in order to utilise the HTTP/2 and invoke SubscribeToShard. The existing FlinkKinesisConsumer implementation is using AWS SDK v1.x. This design proposes using AWS SDK v1.x and v2.x side by side [5]. It is not currently possible to remove AWS SDK v1.x from the Flink Kinesis Connectors project due to Kinesis Producer Library (KPL) and DynamoDBStreamConsumer not yet supporting AWS v2.x. Therefore to minimise change and reduce risk AWS SDK v2.x will only be used by the Fan Out Kinesis Proxy.

Waiting for the refactored Source Interfaces

The Flink community are currently refactoring the source connector interface [6] to allow separation of data consumption and shard management (along with other things). AWS are targeting the current source interfaces, meaning additional work may be required later to support the new FlinkKinesisConsumer. Waiting for the updated interfaces was considered but rejected. It is anticipated that the upgrade path for existing customers will be non-trivial, extra work will be required to migrate snapshots and state. Considering the EFO support work will be encapsulated within the consumption responsibility of the consumer, supporting both interfaces is expected to be straightforward.

Random Default Consumer Names

EFO configuration proposes the user specifies the consumer name. This could result in multiple Flink applications competing for a consumer subscription, when not set appropriately (duplicated) and consuming from the same stream. It was considered to generate a random consumer name for use with a Flink job, however this was rejected. Terminating Flink applications without properly executing the teardown method could result in orphaned consumers. A customer could then encounter additional cost, or approach the 20 consumer limit as a result.

Using Kinesis Client Library (KCL)

KCL [7] was considered to replace the existing consumer but rejected. KCL does not support the use case for the following reasons:

Appendix A - Current State

This section describes how the current FlinkKinesisConsumer is implemented to give context around how and where the changes need to be applied. The FlinkKinesisConsumer is created and configured by the end user application. Everything within the FlinkKinesisConsumer is managed by Flink transparently to the user.

The FlinkKinesisConsumer currently reads data from KDS using a polling mechanism from the following API endpoints:

The block diagram below shows the main components in the FlinkKinesisConsumer:


Each ShardConsumer populates a queue which passes messages back to the end application via the KinesisDataFetcher.


Appendix B - Example EFO Consumer Application Flow

An EFO consumer would require the use of the following API endpoints:

An example EFO consumer flow is illustrated below (it is assumed the application already knows the stream and shard details):

References

[1] https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
[3] https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RecordsPublisher.java
[4] https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Subscription.html
[5] https://docs.aws.amazon.com/sdk-for-java/v2/migration-guide/side-by-side.html
[6] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
[7] https://github.com/awslabs/amazon-kinesis-client
[8] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
[9] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
[10] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html
[11] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreamConsumers.html
[12] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html
[13] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html