...
Page properties | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
We propose adding timeout configuration to AsyncSink by introducing the following API changes.with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.
Public Interfaces
We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler
inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture
which provides the following api
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class AsyncSinkWriterImpl<T> implementsextends AsyncSinkWriter<InputT, ClientRequestImpl> { @Override protected void submitRequestEntries(List<RequestEntryT> requestEntries, AsyncSinkResultHandler resultHandler){ Future<Result> resultFuture = client.asyncSend(requestEntries); resultFuture.whenComplete( (res, err) -> { // request completed successfully, notify async sync writer base if(err == null) { resultHandler.complete(); } else if(isFatal(err)) { // Fatal exception classified by implementer should fail job resultHandler.completeExceptionally(err); } else { // no-fatal exception should retry resultHandler.retryFor(res.failedEntries()); } } ); } } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@PublicEvolving public class AsyncSinkWriterConfiguration { private AsyncSinkWriterConfiguration( public static final long DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMinutes(10).toMillis(); public static final boolean DEFAULT_FAIL_ON_TIMEOUT = false; private AsyncSinkWriterConfiguration( int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, long requestTimeoutMS, boolean failOnTimeout, RateLimitingStrategy rateLimitingStrategy) { ... } @PublicEvolving public static class AsyncSinkWriterConfigurationBuilder { ... /** * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()} * does not block indefinitely on un-acknowledged requests. */ public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { this.requestTimeoutMS = requestTimeout; return this; private long requestTimeoutMS = AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS; private boolean failOnTimeout = AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT; /** * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()} * does not block indefinitely on un-acknowledged requests. */ public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { } this.requestTimeoutMS = requestTimeout; return this; /* } /** * Set {@code failOnTimeout} flag, which indicates whether requests not acknowledged on timeout should fail the job or not * If failOnTimeout is set to false, the {@link AsyncSinkWriter} retries timing out requests. */ public AsyncSinkWriterConfigurationBuilder setFailOnTimeout(boolean failOnTimeout) { this.failOnTimeout = failOnTimeout; return this; } public AsyncSinkWriterConfiguration build() { return new AsyncSinkWriterConfiguration( maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, ; return this; } public AsyncSinkWriterConfiguration build() { return new AsyncSinkWriterConfiguration( maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, requestTimeoutMS, failOnTimeout, rateLimitingStrategy); } |
And we will pass the new configurations as part of the AsyncSink API and exposed via builder
Code Block |
---|
@PublicEvolving public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> { @Deprecated protected AsyncSinkBase( ElementConverter<InputT, RequestEntryT> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { this(elementConverter, maxBatchSize, maxInFlightRequests, requestTimeoutMSmaxBufferedRequests, maxBatchSizeInBytes, failOnTimeoutmaxTimeInBufferMS, maxRecordSizeInBytes, rateLimitingStrategy); } |
And we will pass the new configurations as part of the AsyncSink API and exposed via builder
Code Block |
---|
@PublicEvolving public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> { AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS, AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT } protected AsyncSinkBase( ElementConverter<InputT, RequestEntryT> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, longLong requestTimeoutMS, booleanBoolean failOnTimeout) .... } |
Compatibility, Deprecation, and Migration Plan
...
We will default values for failOnTimeout
to false to maintain a backward compatible behaviour, We will leave the default timeout value for sink implementers to handlealso choose default requestTimeoutMS value to 10 minutes aligning with current checkpoint default timeout.
Test Plan
We will introduce unit tests to the existing AsyncSink
.
...