This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread:
JIRA: KAFKA-1660
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There are several multivations to add a close method with timeout in the producer.
- Sometimes, user will want to close a producer wiithin a bounded time to avoid blocking on producer.close() for too long.
- One specific use case of 1) is that in some scenarios, user will want to close the produce immediately and fail all the unsent messages in RecordAccumulator. (e.g. to preserve order if a send failed). Current KafkaProducer.close() method will try to send those messages. So we need to provide an interface that allow user to choose to close producer in which way.
Public Interfaces
Add another interface:
public void KafkaProducer.close(long timeout, TimeUnit timeUnit).
This call waits some specified time for the producer to close. If the producer did not finish closing before timeout, it close the producer forcefully by failing all the unsent messages.
The close(timeout) should work when called from a user caller thread or the internal sender thread.
When timeout is 0, it fails all unsent messages and close the producer without waiting.
Because it is possible the close(timeout) is called for multiple times, it has to be idempotent.
Proposed Changes
- Add a forceClose flag to sender.
- When forceClose flag is set, sender will not send any more messages but fail all the messages in RecordAccumulator and wake up the threads waiting on a callback. These threads may be doing:
- synchronized send
- flush()
- Cleanup metrics and release other resources.
Compatibility, Deprecation, and Migration Plan
This is a backward compatible change.
Rejected Alternatives
None