Status

Current state: Accepted

Discussion thread: here 

Vote thread: here 

JIRA: KAFKA-10790 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As we know, the callback of `Producer#send` is executed by the `ioThread`. This design has caused numerous application bugs when `Producer#flush` is called within the callback, resulting in deadlocks. Since `Producer#close` already includes logic to avoid blocking when invoked from a callback, we could implement similar logic for `Producer#flush`. This enhancement would help users avoid such pitfalls. 

Most importantly, in the current version of AK, invoking `Producer#flush` within the callback of `Producer#send` eventually leads to a timeout, which can be very confusing. With this enhancement, users will receive a clear message to understand what went wrong.

Public Interfaces

This KIP proposes a way to implement this enhancement:

If the `Producer` detects that the `flush` method is invoked within the callback of the `close` method, throw a `KafkaException`.

This approach would require modifying org.apache.kafka.clients.producer.Producer#flush  to include KafkaException .

Proposed Changes

The following code is a demonstration of how we will add to the flush method:

Producer#flush
    /**
	 *                Omitting the unchange section ......
     * <p>
     * <b>Important:</b> This method must not be called from within the callback provided to
     * {@link #send(ProducerRecord, Callback)}.Invoking <code>flush()</code> in this context will result in a
     * {@link KafkaException} being thrown, as it will cause a deadlock.
     * </p>
     *
     * @throws InterruptException If the thread is interrupted while blocked
     * @throws KafkaException If the method is invoked inside a {@link #send(ProducerRecord, Callback)} callback
     */
    @Override
    public void flush() {
        if (Thread.currentThread() == this.ioThread) {
            log.error("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock.");
            throw new KafkaException("KafkaProducer.flush() invocation inside a callback is not permitted because it may lead to deadlock.");
        }
		
		// Omitting the unchange section ......

    }


Compatibility, Deprecation, and Migration Plan

N/A

Test Plan

Add a unit test to ensure that the deadlock protection throws a `KafkaException` with the corresponding error message to inform the user.

Rejected Alternatives

Adding an error log:

If the `Producer` detects that the `flush` method is invoked within the callback of the `close` method, log an error message.

Reason for Rejection

Although adding an error log message could help users understand what happened, it does not provide sufficient protection.

This KIP aims to offer fast-fail protection, which would save users time by allowing the `send` method to return immediately.



  • No labels