Status

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Async Sink was introduced as part of FLIP-171, and implements a non-configurable rate limiting strategy to reduce network congestion when the destination rejects or throttles requests. We want to make this configurable, to allow the sink implementer to decide the desired congestion control behaviour given a specific destination.

Background on Async Sink

The main component of a sink (with at-least-once semantics) is the SinkWriter, which is responsible for accepting input elements from the Flink Datastream in the write() method, and writing to a destination in the flush() method. See FLIP-143 for more details on the Sink API.

The design principle behind the Async Sink is to abstract away common logic used when plugging into this Sink API, to write asynchronously to a destination with at-least-once semantics. This logic includes persisting un-flushed records, handling asynchronous responses, deciding when to flush and dynamically adjusting the request rate to the destination.

The Async Sink consists of an ElementConverter and an AsyncSinkWriter. The ElementConverter handles converting Datastream elements (InputT) into messages specific to the destination (RequestEntryT). The AsyncSinkWriter handles the logic of actually making an asynchronous request to the destination. It receives a batch of messages (List<RequestEntryT>), makes the asynchronous request(s), and invokes a callback with a batch of any failed messages. See FLIP-171 or this blog post for more details on the Async Sink.

Rate limiting the Async Sink

In the current implementation, the Async Sink adjusts its behaviour depending on whether it can successfully persist messages (RequestEntryT) to the destination. In particular, it uses a non-configurable rate limiting strategy that reduces the number of in-flight messages (RequestEntryT) when any messages in a single batch fail, and increases the number of in-flight messages when all messages in a single batch succeed. The Async Sink adjusts the number of in-flight messages using the Additive Increase / Multiplicative Decrease (AIMD) algorithm, which is a feedback control method used for TCP congestion control. This provides an indirect control of rate, and the sink eagerly tries to increase its in-flight messages until it hits limits from the destination (e.g. throttled). This works well to reduce the congestion on the network caused by the Flink connector.

We propose to make the following configurable:

  • What to scale: The current strategy only controls the number of in-flight messages. For destinations where the number of in-flight requests (batches of messages) is more important than in-flight messages, the sink implementer should be able to decide which factor to control.
  • When to scale: The sink aggressively scales up/down throughput whenever there are successful/failed messages. The sink implementer should be able to make this decision, e.g. only scale down after a threshold of failed messages, only scale up after X successful requests over Y time period.
  • How much to scale: The extent of scaling up/down is fixed. The sink implementer should be able to decide how aggressively they want to scale up/down in event of failure/success.

Public Interfaces

Proposed changes to the public interfaces are:

  • Introduce RateLimitingStrategy interface, which will expose methods to collect information about the requests being started and completed, as well as method to decide whether to block the next request.
  • Add a builder to AsyncSinkWriter, which calls a private constructor with option to specify RateLimitingStrategy
  • Mark the public constructor of AsyncSinkWriter as @Deprecated

Proposed Changes

Overview

We propose to introduce a RateLimitingStrategy, which will make the decision of what to scale, when to scale, and how much to scale.

The interaction between the RateLimitingStrategy and AsyncSinkWriter when submitting a request will be as follows:

  1. AsyncSinkWriter will construct a new request with a batch of messages:
    1. This can be triggered by one of 3 conditions: Timer trigger, batch byte size threshold, batch number size threshold.
    2. AsyncSinkWriter will call RateLimitingStrategy.getMaxBatchSize() to determine the max batch size to include in the request.
  2. AsyncSinkWriter then calls RateLimitingStrategy.shouldBlock() to decide if the newest request should be sent/blocked.
  3. If we decide to send the request, AsyncSinkWriter will call RateLimitingStrategy.registerCompletedRequest() to provide information to the RateLimitingStrategy that request has been sent.
  4. Once request completes, AsyncSinkWriter will call RateLimitingStrategy.registerCompletedRequest() to provide information to the RateLimitingStrategy with information of completed request (e.g. how many failed messages).

With the below suggestion, all these decisions will now be made internally within the RateLimitingStrategy. The AsyncSinkWriter will only call into the RateLimitingStrategy to check if it should send the next request.

RateLimitingStrategy
/**
 * RateLimitingStrategy is consulted before sending a request in AsyncSinkWriter to decide if a request should be sent.
 */
public interface RateLimitingStrategy {
    /**
     * Registers the information of requests being sent (e.g. to track the current inFlightMessages / requests)
     * @param requestInfo    Data class containing information on request being sent
     */
    void registerInFlightRequest(RequestInfo requestInfo);

	 /**
     * Tracks the information of requests completing (e.g. to track the current inFlightMessages / requests).
	 * Any dynamic scaling on failed requests should be done here too.
     * @param requestInfo    Data class containing information on request completed
     */
    void registerCompletedRequest(RequestInfo requestInfo);

 	 /**
     * Decides whether the next request should be blocked.
     * @param requestInfo    Data class containing information on request being sent
     */
    boolean shouldBlock(RequestInfo requestInfo);

	 /**
     * Returns the max batch size to be used by the AsyncSinkWriter.
     */
    int getMaxBatchSize();
}

/**
 * This data class encapsulates the information of starting / completing requests.
 */
public final class RequestInfo {
	public int failedMessages;
	public int batchSize;
	public long requestStartTime;
}


To make it easier for sink implementers, we will also provide an out-of-the box implementation of CongestionControlRateLimitingStrategy, which uses the current AIMDStrategy to control in-flight messages. This mimics the existing behaviour of the AsyncSink.

Migration of existing Async Sinks

We will also migrate the existing Firehose and Kinesis sinks to use the new interface proposed here. There will be no change in behaviour, since the CongestionControlRateLimitingStrategy configured will be the same as the current strategy.

Example usage of RateLimitingStrategy

This logic will be in the AsyncSinkWriter, but this is an example use case for the interface proposed here

Example usage of RateLimitingStrategy
RequestInfo info = buildRequestInfo(strategy.getMaxBatchSize());

while (strategy.shouldBlock(info)) {
    sleep();
    info = buildRequestInfo(strategy.getMaxBatchSize());
}

strategy.registerInFlightRequest(info);

// Send the batch

strategy.registerCompletedRequest(info);

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users of the Async Sink, since the behaviour is unchanged. The @Deprecated constructor will be configured to have the exact same RateLimitingStrategy as it has currently.

There is no deprecation plan for existing behaviour, since we are not changing the behaviour, only making it configurable.

Test Plan

  • AsyncSinkWriter
    • We will introduce tests to verify that the RateLimitingStrategy specified is respected.
  • CongestionControlRateLimitingStrategy follows a strict mathematical function, so we will test that the output follows that function.

Rejected Alternatives

Rejected the alternative of creating a more general RateLimitingStrategy that can control number of requests per unit time (e.g. 500 requests/minute). That would require us to expose controls to the RateLimitingStrategy to trigger a flush() on the AsyncSinkWriter, which increases complexity of the Async Sink implementation. Since congestion control works well at the moment, we decided against this.