Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline1
titleProducerExceptionHandler
linenumberstrue
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.io.Closeable;

/**
 * Interface that specifies how an the RecordTooLargeException and/or UnknownTopicOrPartitionException should be handled.
 * The accepted responses for RecordTooLargeException are FAIL and SWALLOW. Therefore, RETRY will be interpreted and executed as FAIL.
 */
public interface ProducerExceptionHandler extends Configurable, Closeable {

    /**
     * Determine whether to stop processing, or swallow the error by dropping the record.
     *
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    default NonRetriableResponse handle(final ProducerRecord record, final RecordTooLargeException exception) {
		// return the value corresponding to the default behaviour
    }

    /**
     * Determine whether to stop processing, keep retrying internally, or swallow the error by dropping the record.
     *
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    default RetriableResponse handle(final ProducerRecord record, final UnknownTopicOrPartitionException exception) {
		// return the value corresponding to the default behaviour
    }


    enum NonRetriableResponse {
        /* stop processing: fail */
        FAIL(0, "FAIL"),
        /* drop the record and continue */
        SWALLOW(1, "SWALLOW");

        /**
         * an english description of the api--this is for debugging and can change
         */
        public final String name;

        /**
         * the permanent and immutable id of an API--this can't change ever
         */
        public final int id;

        RecordTooLargeExceptionResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
    enum RetriableResponse {
        /* stop processing: fail */
        FAIL(0, "FAIL"),
        /* continue: keep retrying */
        RETRY(1, "RETRY"),
        /* continue: swallow the error */
        SWALLOW(2, "SWALLOW");

        /**
         * an english description of the api--this is for debugging and can change
         */
        public final String name;

        /**
         * the permanent and immutable id of an API--this can't change ever
         */
        public final int id;

        UnknownTopicOrPartitionExceptionResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
}  

...