Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA:
-
KAFKA-10790Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
As we know, the callback of `Producer#send` is executed by the `ioThread`. This design has caused numerous application bugs when `Producer#flush` is called within the callback, resulting in deadlocks. Since `Producer#close` already includes logic to avoid blocking when invoked from a callback, we could implement similar logic for `Producer#flush`. This enhancement would help users avoid such pitfalls.
Most importantly, in the current version of AK, invoking `Producer#flush` within the callback of `Producer#send` eventually leads to a timeout, which can be very confusing. With this enhancement, users will receive a clear message to understand what went wrong.
Public Interfaces
This KIP proposes a way to implement this enhancement:
If the `Producer` detects that the `flush` method is invoked within the callback of the `close` method, throw a `KafkaException`.
This approach would require modifying org.apache.kafka.clients.producer.Producer#flush
to include KafkaException
.
Proposed Changes
The following code is a demonstration of how we will add to the flush method:
/** * Omitting the unchange section ...... * <p> * <b>Important:</b> This method must not be called from within the callback provided to * {@link #send(ProducerRecord, Callback)}.Invoking <code>flush()</code> in this context will result in a * {@link KafkaException} being thrown, as it will cause a deadlock. * </p> * * @throws InterruptException If the thread is interrupted while blocked * @throws KafkaException If the method is invoked inside a {@link #send(ProducerRecord, Callback)} callback */ @Override public void flush() { if (Thread.currentThread() == this.ioThread) { log.error("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock."); throw new KafkaException("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock."); } // Omitting the unchange section ...... }
Compatibility, Deprecation, and Migration Plan
N/A
Test Plan
Add a unit test to ensure that the deadlock protection throws a `KafkaException` with the corresponding error message to inform the user.
Rejected Alternatives
Adding an error log:
If the `Producer` detects that the `flush` method is invoked within the callback of the `close` method, log an error message.
Reason for Rejection
Although adding an error log message could help users understand what happened, it does not provide sufficient protection.
This KIP aims to offer fast-fail protection, which would save users time by allowing the `send` method to return immediately.