Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
On working on several implementing connectors for FLIP-171: Async Sink we have encountered a couple of issues similar to:
- - FLINK-34071Getting issue details... STATUS and FLINK-30304: lacking timeout option for Async Sink records might cause deadlock.
The reason of such issues is the dependency on implementing sinks (and possibly sdk clients) to acknowledge requests or requeue failed entries, without a configured timeout failures in acknowledging requests could possibly cause indifinite deadlock which blocks completion of checkpoints and job transitions.
Proposed Changes
We propose adding timeout configuration to AsyncSink with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.
Public Interfaces
We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler
inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture
which provides the following api
@PublicEvolving public interface ResultHandler<RequestEntryT> { /** * Mark the in-flight request as completed as successfully. */ void complete(); /** * Fail job with fatal exception. */ void completeExceptionally(Exception e); /** * Requeue Failed entries to retry. */ void retryForEntries(List<RequestEntryT> requestEntriesToRetry); }
and using it as the new base api in AsyncSink
by the following changes
@PublicEvolving public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> { { .... @Deprecated protected abstract void submitRequestEntries( List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry); protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler); }
We will also introduce 2 additional configuration to AsyncSinkWriterConfiguration
which should be Long requestTimeout
and Boolean shouldFailOnTimeout
which should be used in AsyncSink to implement retryOnTimeout
and failOnTimeout
mechanisms as the following psuedoCode
@PublicEvolving public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> { .... private void flush() throws InterruptedException { ... //create batch ResultHandler<RequestEntryT> handler = new ResultHandler<> () { ScheduledFuture<?> timeout = registerRequestTimeout(sendTime) List<RequestEntryT> requestEntries = batch; AtomicBoolean closed = new AtomicBoolean(false); public void complete() { if (closed.compareAndSet(false, true)) { timeoutTimer.cancel(false); mailboxExecutor.execute( () -> completeRequest(Collections.emptyList(), batch.size(), System.currentTimeMillis()), "Mark in-flight request as completed"); } } private void timeout() { if (closed.compareAndSet(false, true)) { timeoutTimer.cancel(false); if(failOnTimeout) { mailboxExecuter.execute(...fail) } else { mailboxExecuter.execute(completeRequest(batch, batchSize, requestTimestamp)) } } } submitRequestEntries(batch, handler); } .... protected void submitRequestEntries(List<RequestEntryT> requestEntries, AsyncSinkResultHandler resultHandler); }
And an example implementation would be
public class AsyncSinkWriterImpl<T> extends AsyncSinkWriter<InputT, ClientRequestImpl> { @Override protected void submitRequestEntries(List<RequestEntryT> requestEntries, AsyncSinkResultHandler resultHandler){ Future<Result> resultFuture = client.asyncSend(requestEntries); resultFuture.whenComplete( (res, err) -> { // request completed successfully, notify async sync writer base if(err == null) { resultHandler.complete(); } else if(isFatal(err)) { // Fatal exception classified by implementer should fail job resultHandler.completeExceptionally(err); } else { // no-fatal exception should retry resultHandler.retryFor(res.failedEntries()); } } ); } }
@PublicEvolving public class AsyncSinkWriterConfiguration { public static final long DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMinutes(10).toMillis(); public static final boolean DEFAULT_FAIL_ON_TIMEOUT = false; private AsyncSinkWriterConfiguration( int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, long requestTimeoutMS, boolean failOnTimeout, RateLimitingStrategy rateLimitingStrategy) { ... } @PublicEvolving public static class AsyncSinkWriterConfigurationBuilder { ... private long requestTimeoutMS = AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS; private boolean failOnTimeout = AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT; /** * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()} * does not block indefinitely on un-acknowledged requests. */ public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { this.requestTimeoutMS = requestTimeout; return this; } /** * Set {@code failOnTimeout} flag, which indicates whether requests not acknowledged on timeout should fail the job or not * If failOnTimeout is set to false, the {@link AsyncSinkWriter} retries timing out requests. */ public AsyncSinkWriterConfigurationBuilder setFailOnTimeout(boolean failOnTimeout) { this.failOnTimeout = failOnTimeout; return this; } public AsyncSinkWriterConfiguration build() { return new AsyncSinkWriterConfiguration( maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, requestTimeoutMS, failOnTimeout, rateLimitingStrategy); }
And we will pass the new configurations as part of the AsyncSink API and exposed via builder
@PublicEvolving public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable> implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> { @Deprecated protected AsyncSinkBase( ElementConverter<InputT, RequestEntryT> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) { this(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS, AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT } protected AsyncSinkBase( ElementConverter<InputT, RequestEntryT> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, Long requestTimeoutMS, Boolean failOnTimeout) .... }
Compatibility, Deprecation, and Migration Plan
To maintain backward compatibility we will provide default implementation for the new method
protected void submitRequestEntries(List<RequestEntryT> requestEntries, ResultHandler<RequestEntryT> resultHandler){ submitRequestEntries(requestEntries, (entries) -> { if (entries.isEmpty()) { resultHandler.complete(); } else { resultHandler.retryForEntries(entries); } }); }
Connectors Migration Status
Connector | Jira | Status |
---|---|---|
Kinesis | TBD | TODO |
Firehose | TBD | TODO |
DynamoDb | TBD | TODO |
ElasticSearch6 | TBD | TODO |
ElasticSearch7 | TBD | TODO |
ElasticSearch8 | TBD | TODO |
Default Options
We will default values for failOnTimeout
to false, We also choose default requestTimeoutMS value to 10 minutes aligning with current checkpoint default timeout.
Test Plan
We will introduce unit tests to the existing AsyncSink
.
Rejected Alternatives
NA