Current state: Accepted
Discussion thread: here
JIRA: KAFKA-1660, KAFKA-1659
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Current KafkaProducer.close() method will try to finish sending all pending messages before it returns. There are several motivations to add a close method with timeout in the producer.
So we need to provide an interface that allow user to choose to close producer in which way.
Add another interface:
public void close(long timeout, TimeUnit timeUnit)
This call waits some specified time for the producer to close. If the producer did not finish closing before timeout, it close the producer forcefully by failing all the unsent messages.
Several addition considerations:
To close a producer forcefully, the following changes are going to be made:
When close(positive, TimeUnit.MILLISECONDS) is called, it will try to do a normal close first. If the normal close did not finish before timeout, it then close the producer forcefully. It will also wait the sender thread to finish.
If user calls close() in callback, it will block forever because the sender thread will try to join itself. So if a close() call is called from sender thread, an error message will be put in the log and close(0) will be called instead.
This is a backward compatible change.
Add an abort() call which close producer forcefully. Leave the close() as is (KAFKA-1659)This is a viable solution but does not provide easy solution to a time bounded shutdown. As quoting from KAFKA-1660 commented by Andrew Stein.
" I don't think that this (the request for producer.close(timeout)) and KAFKA-1659 (the request for producer.abort()) are the same.
If producer.close(timeout) were to return without actually closing the producer in time, the user could then continue with producer.abort(). However, this would not be the only case where producer.abort() would be useful. It could be used if a producer.send() were to fail or if a future.get() were to fail as well.
In addition if producer.close() were to return without actually closing the producer in time, the user could continue with other logic besides producer.abort(). "
In KAFKA-1659, we decided to merge the abort() into producer.close(timeout) to avoid adding another interface. It is also agreed in KAFKA-1934.