Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

On working on several implementing connectors for FLIP-171: Async Sink we have encountered a couple of issues similar to:

  • FLINK-34071 - Getting issue details... STATUS and FLINK-30304: lacking timeout option for Async Sink records might cause deadlock.

The reason of such issues is the dependency on implementing sinks (and possibly sdk clients) to acknowledge requests or requeue failed entries, without a configured timeout failures in acknowledging requests could possibly cause indifinite deadlock which blocks completion of checkpoints and job transitions.

Proposed Changes

We propose adding timeout configuration to AsyncSink with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.


Public Interfaces

We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture which provides the following api

AsyncSinkResultHandler
@PublicEvolving
public interface ResultHandler<RequestEntryT>
{
    /**
     * Mark the in-flight request as completed as successfully.
     */
    void complete();

    /**
     * Fail job with fatal exception.
     */ 
    void completeExceptionally(Exception e);

     
    /**
     * Requeue Failed entries to retry.
     */  
    void retryForEntries(List<RequestEntryT> requestEntriesToRetry);
 }

and using it as the new base api in AsyncSink by the following changes

AsyncSink code chagnes
@PublicEvolving 
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> { {
    ....
     
     
     @Deprecated
     protected abstract void submitRequestEntries(
            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> requestToRetry);

    protected abstract void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 ResultHandler<RequestEntryT> resultHandler);

 }

We will also introduce 2 additional configuration to AsyncSinkWriterConfiguration  which should be Long requestTimeout  and Boolean shouldFailOnTimeout  which should be used in AsyncSink to implement retryOnTimeout  and failOnTimeout mechanisms as the following psuedoCode

AsyncSinkWriter code chagnes
@PublicEvolving 
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
    ....                   
    private void flush() throws InterruptedException  {
       ... //create batch
	   ResultHandler<RequestEntryT> handler = new ResultHandler<> () {
        	ScheduledFuture<?> timeout = registerRequestTimeout(sendTime) 
            List<RequestEntryT> requestEntries = batch;        
            AtomicBoolean closed = new AtomicBoolean(false);
            public void complete() {
            if (closed.compareAndSet(false, true)) {
                timeoutTimer.cancel(false);
                mailboxExecutor.execute(
                        () -> completeRequest(Collections.emptyList(), batch.size(), System.currentTimeMillis()),
                        "Mark in-flight request as completed");
            }
        }

        private void timeout() {
            if (closed.compareAndSet(false, true)) {
                timeoutTimer.cancel(false);
                if(failOnTimeout) {
                   mailboxExecuter.execute(...fail)
                } else {
				   mailboxExecuter.execute(completeRequest(batch, batchSize, requestTimestamp))
				}

            }
        }
	 
		submitRequestEntries(batch, handler); 
	}
    ....    
	protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 AsyncSinkResultHandler resultHandler);     
}


And an example implementation would be 

AsyncSinkWriteImpl example
public class AsyncSinkWriterImpl<T>
        extends AsyncSinkWriter<InputT, ClientRequestImpl> {
   
	@Override  
	protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 AsyncSinkResultHandler resultHandler){
		Future<Result> resultFuture = client.asyncSend(requestEntries);
		resultFuture.whenComplete(
			(res, err) -> {
				// request completed successfully, notify async sync writer base
				if(err == null) {
				  resultHandler.complete();
				} else if(isFatal(err)) { // Fatal exception classified by implementer should fail job
					resultHandler.completeExceptionally(err);
				} else { // no-fatal exception should retry
					resultHandler.retryFor(res.failedEntries());
				}
			}
		);
	}    
}


AsyncSinkWriterConfiguration code chagnes
@PublicEvolving  
public class AsyncSinkWriterConfiguration    {
		public static final long DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMinutes(10).toMillis();
        public static final boolean DEFAULT_FAIL_ON_TIMEOUT = false;
		                        
    	private AsyncSinkWriterConfiguration(
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
            long requestTimeoutMS,
			boolean failOnTimeout,
            RateLimitingStrategy rateLimitingStrategy) { ... }                  
	   @PublicEvolving
	   public static class AsyncSinkWriterConfigurationBuilder {
                          ...
		 private long requestTimeoutMS =  AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS;
         private boolean failOnTimeout =  AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT;

         /**
 		 * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried
 		 * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()}
 		 * does not block indefinitely on un-acknowledged requests.
		 */
         public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { 
            this.requestTimeoutMS = requestTimeout;
            return this;
         } 
          
	     /**
 		 * Set {@code failOnTimeout} flag, which indicates whether requests not acknowledged on timeout should fail the job or not
 		 * If failOnTimeout is set to false, the {@link AsyncSinkWriter} retries timing out requests. 
 		 */
         public AsyncSinkWriterConfigurationBuilder setFailOnTimeout(boolean failOnTimeout) {

          this.failOnTimeout = failOnTimeout;
          return this;
       }
       public AsyncSinkWriterConfiguration build() {
          return new AsyncSinkWriterConfiguration( 
                    maxBatchSize, 
                    maxInFlightRequests, 
                    maxBufferedRequests, 
                    maxBatchSizeInBytes, 
                    maxTimeInBufferMS, 
                    maxRecordSizeInBytes, 
                    requestTimeoutMS,
                    failOnTimeout,
                    rateLimitingStrategy);
}


And we will pass the new configurations as part of the AsyncSink API and exposed via builder

@PublicEvolving 
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
        implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> {
   	@Deprecated
	protected AsyncSinkBase(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes) {
		this(elementConverter,
		 	 maxBatchSize,
     		 maxInFlightRequests,
             maxBufferedRequests,
             maxBatchSizeInBytes,
             maxTimeInBufferMS,
             maxRecordSizeInBytes,
             AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS,
             AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT 
			
	}


 	protected AsyncSinkBase(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
			Long requestTimeoutMS,
			Boolean failOnTimeout)
		....
}

Compatibility, Deprecation, and Migration Plan

To maintain backward compatibility we will provide default implementation for the new method

protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 ResultHandler<RequestEntryT> resultHandler){
        submitRequestEntries(requestEntries, (entries) -> {
            if (entries.isEmpty()) {
                resultHandler.complete();
            } else {
                resultHandler.retryForEntries(entries);
            }
        });
    }


Connectors Migration Status

ConnectorJiraStatus
KinesisTBDTODO
FirehoseTBDTODO
DynamoDbTBDTODO
ElasticSearch6TBDTODO
ElasticSearch7TBDTODO
ElasticSearch8TBDTODO

Default Options

We will default values for failOnTimeout  to false, We also choose default requestTimeoutMS value to 10 minutes aligning with current checkpoint default timeout.

Test Plan

We will introduce unit tests to the existing AsyncSink .

Rejected Alternatives

NA