Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We introduce the ProducerExceptionHandler interface, which can be implemented by the user to manage the exception in the desired manner.
To configure their own handler, the user must implement the above introduced interface and add the class name in producer configuration with the key: custom.exception.handler.

Code Block
languagejava
firstline1
titleProducerExceptionHandler
linenumberstrue
package org.apache.kafka.common.errors;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Interface that specifies how an exception should be handled.
 */
@InterfaceStability.Evolving
public interface ProducerExceptionHandler extends Configurable {

    /**
     * Determine whether to stop processing, keep retrying internally, or swallow the error by dropping the record.
     *
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    ProducerExceptionHandlerResponseResponse handle(final ProducerRecord<byte[], byte[]> record,
                                            final Exception exception);

    enum ProducerExceptionHandlerResponseResponse {
        /* stop processing: fail */
        FAIL(0, "FAIL"),
        /* continue: keep retrying */
        RETRY(1, "RETRY"),
        /* continue: swallow the error */
        SWALLOW(2, "SWALLOW");

        /**
         * an english description of the api--this is for debugging and can change
         */
        public final String name;

        /**
         * the permanent and immutable id of an API--this can't change ever
         */
        public final int id;

        ProducerExceptionHandlerResponse(final int id,
                                         final String name) {
            this.id = id;
            this.name = name;
        }
    }
}
 

...

Code Block
languagejava
titleProducerConfig
.
.
.

public static final String CUSTOM_EXCEPTION_HANDLER_CLASS_CONFIG = "custom.exception.handler";
private static final String CUSTOM_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.common.errors.ProducerExceptionHandler</code> interface.";.
.
.
static {
CONFIG = new ConfigDef().define(
.....
.
.
.
                         .define(CUSTOM_EXCEPTION_HANDLER_CLASS_CONFIG,
                                 Type.CLASS,
                                 null,
                                 Importance.MEDIUM,
                                 CUSTOM_EXCEPTION_HANDLER_CLASS_DOC);
}


Proposed Changes

The custom handler will only affect the exceptions thrown from the producer. This KIP, very specifically means to provide a possibility for users to manage a limited number of exceptions (RecordTooLargeException and UnknownTopicOrPartitionException so far) thrown from the producer send() method. Of course the same exceptions may originate from different components of Apache Kafka which are not the focus of this KIP. 

Notes on RecordTooLargeException based on the codebase we have today:

  • When producer sends a too large record in non-transactional mode, the producer send() method throws no exception but returns a record metadata that includes the error RecordTooLargeException. Obviously, a failed sent record does not reach the broker.  
  • With the changes made here (because of
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-9279
    ) the producer send() method throws a RecordTooLargeException facing too large records in transactions. The user can bring the custom handler to bear to avoid the entire batch failing by dropping the poisoning too large record (SWALLOW the error). This way, this record does not get included in the batch.
  • There is another scenario in which the record size is acceptable by the producer (the record is NOT too large from producer point of view due to setting the "max.request.size" and "buffer.memory" to big numbers in producer config), but it is too large for the broker (The default message size of broker is 1 MB). In such case, the broker throws RecordTooLargeException during commitTransaction(). This scenario is not the focus of this KIP.


Compatibility, Deprecation, and Migration Plan

Changed behaviour: The default behaviour stays as it is, but the user can change the behaviour by implementing the handle() function.

...