Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose adding timeout configuration to AsyncSink with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.


Public Interfaces

We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture which provides the following api

...

Code Block
languagejava
firstline1
titleAsyncSinkWriteImpl example
public class AsyncSinkWriterImpl<T>
        implementsextends AsyncSinkWriter<InputT, ClientRequestImpl> {
   
	@Override  
	protected void submitRequestEntries(List<RequestEntryT> requestEntries,
                                                 AsyncSinkResultHandler resultHandler){
		Future<Result> resultFuture = client.asyncSend(requestEntries);
		resultFuture.whenComplete(
			(res, err) -> {
				// request completed successfully, notify async sync writer base
				if(err == null) {
				  resultHandler.complete();
				} else if(isFatal(err)) { // Fatal exception classified by implementer should fail job
					resultHandler.completeExceptionally(err);
				} else { // no-fatal exception should retry
					resultHandler.retryFor(res.failedEntries());
				}
			}
		);
	}    
}

...