...
We propose adding timeout configuration to AsyncSink with retryOnTimeout and FailOnTimeout mechanisms to ensure the writer doesn't block on un-acknowledged requests.
Public Interfaces
We propose introducing a new org.apache.flink.connector.base.sink.writer.ResultHandler
inspired by the org.apache.flink.streaming.api.functions.async.ResultFuture
which provides the following api
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class AsyncSinkWriterImpl<T> implementsextends AsyncSinkWriter<InputT, ClientRequestImpl> { @Override protected void submitRequestEntries(List<RequestEntryT> requestEntries, AsyncSinkResultHandler resultHandler){ Future<Result> resultFuture = client.asyncSend(requestEntries); resultFuture.whenComplete( (res, err) -> { // request completed successfully, notify async sync writer base if(err == null) { resultHandler.complete(); } else if(isFatal(err)) { // Fatal exception classified by implementer should fail job resultHandler.completeExceptionally(err); } else { // no-fatal exception should retry resultHandler.retryFor(res.failedEntries()); } } ); } } |
...