Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/ft7wcw7kyftvww25n5fm4l925tlgdfg0
Vote threadhttps://lists.apache.org/thread/9mkzdv0gzssxh8p0y1cl3dvj6lvwyrc0
StatusAccepted
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-35435

Release1.20


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We propose adding timeout configuration to AsyncSink by introducing the following API changes.with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.


Public Interfaces

We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture which provides the following api

...

Code Block
languagejava
firstline1
titleAsyncSinkWriteImpl example
public class AsyncSinkWriterImpl<T>
        implementsextends AsyncSinkWriter<InputT, ClientRequestImpl> {
   
	@Override  
	protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 AsyncSinkResultHandler resultHandler){
		Future<Result> resultFuture = client.asyncSend(requestEntries);
		resultFuture.whenComplete(
			(res, err) -> {
				// request completed successfully, notify async sync writer base
				if(err == null) {
				  resultHandler.complete();
				} else if(isFatal(err)) { // Fatal exception classified by implementer should fail job
					resultHandler.completeExceptionally(err);
				} else { // no-fatal exception should retry
					resultHandler.retryFor(res.failedEntries());
				}
			}
		);
	}    
}

...

Code Block
languagejava
firstline1
titleAsyncSinkWriterConfiguration code chagnes
@PublicEvolving  
public class AsyncSinkWriterConfiguration    {                        

		private AsyncSinkWriterConfiguration(
            
		public static final long DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMinutes(10).toMillis();
        public static final boolean DEFAULT_FAIL_ON_TIMEOUT = false;
		                        
    	private AsyncSinkWriterConfiguration(
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
            long requestTimeoutMS,
			boolean failOnTimeout,
            RateLimitingStrategy rateLimitingStrategy) { ... }                  
	   @PublicEvolving
	   public static class AsyncSinkWriterConfigurationBuilder {
                          ...
 		 /**
 		 * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried
 		 * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()}
 		 * does not block indefinitely on un-acknowledged requests.
		 */
         public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { 
            this.requestTimeoutMS = requestTimeout;
            return this;
 private long requestTimeoutMS =  AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS;
         private boolean failOnTimeout =  AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT;

         /**
 		 * Set Request Timeout in milliseconds for AsyncSink, Requests not completed within the timeout are retried
 		 * if {@link failOnTimeout} is false, this maintains the at least once semantic guarantee and ensures {@code AsyncSinkWriter#flush()}
 		 * does not block indefinitely on un-acknowledged requests.
		 */
         public AsyncSinkWriterConfigurationBuilder setRequestTimeoutMs(long requestTimeoutMS) { 
            } 
this.requestTimeoutMS = requestTimeout;
            return this;
	       /*   } 
          
	     /**
 		 * Set {@code failOnTimeout} flag, which indicates whether requests not acknowledged on timeout should fail the job or not
 		 * If failOnTimeout is set to false, the {@link AsyncSinkWriter} retries timing out requests. 
 		 */
         public AsyncSinkWriterConfigurationBuilder setFailOnTimeout(boolean failOnTimeout) {

          this.failOnTimeout = failOnTimeout;
          return this;
       }
       public AsyncSinkWriterConfiguration build() {
          return new AsyncSinkWriterConfiguration( 
                    maxBatchSize, 
                    maxInFlightRequests, 
                    maxBufferedRequests, 
                    maxBatchSizeInBytes, 
                    maxTimeInBufferMS, 
                    maxRecordSizeInBytes, 
       ;
          return this;
       }
       public AsyncSinkWriterConfiguration build() {
          return new AsyncSinkWriterConfiguration( 
                    maxBatchSize, 
                    maxInFlightRequests, 
                    maxBufferedRequests, 
                    maxBatchSizeInBytes, 
                    maxTimeInBufferMS, 
                    maxRecordSizeInBytes, 
                    requestTimeoutMS,
                    failOnTimeout,
                    rateLimitingStrategy);
}


And we will pass the new configurations as part of the AsyncSink API and exposed via builder

Code Block
@PublicEvolving 
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
        implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> {
   	@Deprecated
	protected AsyncSinkBase(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes) {
		this(elementConverter,
		 	 maxBatchSize,
     		 maxInFlightRequests,
             requestTimeoutMSmaxBufferedRequests,
             maxBatchSizeInBytes,
             failOnTimeoutmaxTimeInBufferMS,
             maxRecordSizeInBytes,
         rateLimitingStrategy);
}

And we will pass the new configurations as part of the AsyncSink API and exposed via builder

Code Block
@PublicEvolving 
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
        implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> {    AsyncSinkWriterConfiguration.DEFAULT_REQUEST_TIMEOUT_MS,
             AsyncSinkWriterConfiguration.DEFAULT_FAIL_ON_TIMEOUT 
			
	}


 	protected AsyncSinkBase(
            ElementConverter<InputT, RequestEntryT> elementConverter,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
			longLong requestTimeoutMS,
			booleanBoolean failOnTimeout)
		....
}

Compatibility, Deprecation, and Migration Plan

...

We will default values for failOnTimeout  to false to maintain a backward compatible behaviour, We will leave the default timeout value for sink implementers to handlealso choose default requestTimeoutMS value to 10 minutes aligning with current checkpoint default timeout.

Test Plan

We will introduce unit tests to the existing AsyncSink .

...