Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-15309 
Other related tickets: KAFKA-9279 - Getting issue details... STATUS , KAFKA-15259 - Getting issue details... STATUS , KAFKA-10340 - Getting issue details... STATUS , KAFKA-12990 - Getting issue details... STATUS , KAFKA-13634 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We believe that the user should be able to develop custom exception handlers for managing producer exceptions. On the other hand, this will be an expert-level API, and using that may result in strange behaviour in the system, making it hard to find the root cause. Therefore, the custom handler is currently limited to handling RecordTooLargeException and UnknownTopicOrPartitionException. The motivation for this KIP is derived from the following use cases:

  • RecordTooLargeException:
    • In transactions, the producer collects multiple records in batches. Then a RecordTooLargeException related to a single record leads to failing the entire batch. A custom exception handler in this case may decide on dropping the record and continuing the processing. See Example 1, please.
    • More over, for Kafka Streams, a record that is too large is a poison pill record, and there is no easy way to skip over it. Currently, Kafka Streams treats this error as fatal and seeks to go back to the last commit of the input topic offset and retry to hit the same error again. It would require a major change inside Kafka Streams to add a bookkeeping code to track this error case correctly and to skip over this record when retrying. A handler would allow us to react to this error inside the producer, i.e., local to where the error happens, and thus simplify the overall code significantly.
  • UnknownTopicOrPartitionException: For this case, the producer handles this exception internally and only issues a WARN log about missing metadata and retries internally. Later, when the producer hits the "deliver.timeout.ms", it throws a TimeoutException and the user can only blindly retry, resulting in an infinite retry loop. The thrown TimeoutException "cuts" the connection to the underlying root cause of missing metadata (which could indeed be a transient error but is persistent for a non-existing topic). Thus, there is no programmatic way to break the infinite retry loop. Kafka Streams also blindly retries for this case, and the application gets stuck.

This KIP introduces an interface that can be implemented by the user to handle the exceptions UnknownTopicOrPartitionException and RecordTooLargeException.
Question: Why do we need an interface for handling the exceptions? Could we have a couple of simple producer configuration options for those two exceptions? 
Answer: We aim at giving the user the flexibility of an interface. For example, facing UnknownTopicOrPartitionException, the user may want to raise an error for some topics but retry it for other topics. Having a configuration option with a fixed set of possibilities does not serve the user's needs. See Example 2, please. 

Public Interfaces

We introduce the ProducerExceptionHandler interface, which can be implemented by the user to manage UnknownTopicOrPartitionException and RecordTooLargeException in the desired manner. It can either stop or continue the processing. Stopping processing is named as FAIL since the transaction or record sending (in non-transactional mode) will fail. In the case of continuing processing, either the record is dropped and the error is ignored (SWALLOW) or sending is retried (RETRY). 

To configure their own handler, the user must implement the above-introduced interface and add the class name in producer configuration with the key: custom.exception.handler.class.
More than that, we introduced two more configuration parameters as follows to handle the two exceptions without needing to implement the interface.

  • drop.invalid.large.records with a default value of `false` for swallowing too large records.
  • retry.unknown.topic.partition.ms with a default value of `Integer.MAX_VALUE` that performs RETRY for min(`max.block.ms`, `retry.unknown.topic.partition.ms`) encountering the UnknownTopicOrPartitionException.

Always, the most "conservative" thing will hit first. E.g., if there is a retriable error, the producer has a retry timeout, and the handler might have one; whichever timeout hits first will stop the retry loop. If the handler says "SWALLOW",  we also break the retry loop (more conservative), and if the handler says FAIL, we fail right away, not even waiting for a timeout to hit. Obviously, the order of conservativity is FAIL > SWALLOW > RETRY, where FAIL is the most conservative action and RETRY is the least.

  


ProducerExceptionHandler
package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

import java.io.Closeable;

/**
 * Interface that specifies how an the RecordTooLargeException and/or UnknownTopicOrPartitionException should be handled.
 * The accepted responses for RecordTooLargeException are FAIL and SWALLOW. Therefore, RETRY will be interpreted and executed as FAIL.
 */
public interface ProducerExceptionHandler extends Configurable, Closeable {

    /**
     * Determine whether to stop processing, or swallow the error by dropping the record.
     *
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    default NonRetriableResponse handle(final ProducerRecord record, final RecordTooLargeException exception) {
		// return the value corresponding to the default behaviour
    }

    /**
     * Determine whether to stop processing, keep retrying internally, or swallow the error by dropping the record.
     *
     * @param record The record that failed to produce
     * @param exception The exception that occurred during production
     */
    default RetriableResponse handle(final ProducerRecord record, final UnknownTopicOrPartitionException exception) {
		// return the value corresponding to the default behaviour
    }


    enum NonRetriableResponse {
        /* stop processing: fail */
        FAIL(0, "FAIL"),
        /* drop the record and continue */
        SWALLOW(1, "SWALLOW");

        /**
         * an english description of the api--this is for debugging and can change
         */
        public final String name;

        /**
         * the permanent and immutable id of an API--this can't change ever
         */
        public final int id;

        RecordTooLargeExceptionResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
    enum RetriableResponse {
        /* stop processing: fail */
        FAIL(0, "FAIL"),
        /* continue: keep retrying */
        RETRY(1, "RETRY"),
        /* continue: swallow the error */
        SWALLOW(2, "SWALLOW");

        /**
         * an english description of the api--this is for debugging and can change
         */
        public final String name;

        /**
         * the permanent and immutable id of an API--this can't change ever
         */
        public final int id;

        UnknownTopicOrPartitionExceptionResponse(final int id, final String name) {
            this.id = id;
            this.name = name;
        }
    }
}  


ProducerConfig
.
.
.

public static final String CUSTOM_EXCEPTION_HANDLER_CLASS_CONFIG = "custom.exception.handler.class";
private static final String CUSTOM_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.common.errors.ProducerExceptionHandler</code> interface.";.

public static final String DROP_INVALID_LARGE_RECORDS_CONFIG = "drop.invalid.large.records";
private static final String DROP_INVALID_LARGE_RECORDS_DOC = "When set to 'true', records larger than <code>" + MAX_REQUEST_SIZE_CONFIG + "</code> are dropped."

public static final String RETRY_UNKNOWN_TOPIC_PARTITION_MS_CONFIG = "retry.unknown.topic.partition.ms";
private static final String RETRY_UNKNOWN_TOPIC_PARTITION_MS_DOC = "The configuration controls the retrying duration encountering the UnknownTopicOrPartitionException. The user should set it to lower the timeout compared to <code>" + MAX_BLOCK_MS_CONFIG + "</code>.";.
. .
.
static {
CONFIG = new ConfigDef().define(
.....
.
.
.
                         .define(CUSTOM_EXCEPTION_HANDLER_CLASS_CONFIG,
                                 Type.CLASS,
                                 null,
                                 Importance.MEDIUM,
                                 CUSTOM_EXCEPTION_HANDLER_CLASS_DOC)
                         .define(DROP_INVALID_LARGE_RECORDS_CONFIG,
                                 Type.BOOLEAN,
                                 false,
                                 Importance.LOW,
                                 DROP_INVALID_LARGE_RECORDS_DOC)
                        .define(RETRY_UNKNOWN_TOPIC_PARTITION_MS_CONFIG,
                                 Type.INT,
                                 Integer.MAX_VALUE,
                                 Importance.LOW,
                                 RETRY_UNKNOWN_TOPIC_PARTITION_MS_DOC);
 }


Proposed Changes

The RecordTooLargeException can be thrown by broker, producer and consumer. Of course, the ProducerExceptionHandler interface is introduced to affect ONLY the exceptions thrown from the producer.

With the changes made
here (because of KAFKA-9279 - Getting issue details... STATUS ) the producer send() method throws a RecordTooLargeException facing too large records in transactions. The user can bring the custom handler to bear to avoid the entire batch failing by dropping the poisoning too large record (SWALLOW the error). This way, this record does not get included in the batch.

Examples

// Example 1: RecordTooLargeException use case 
// In this example we except that the producer follows the custom handler and  does not fail. It may make a batch of normal records and commit the transaction successfully.

public class Example1 {  
	public static void main(String[] args) {
        
		Properties producerProps = new Properties();
 	    producerProps.put("custom.exception.handler.class", Example1.MyProducerExceptionHandler.class.getName());
        // .....  omitted for brevity
        KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

        Properties consumerProps = new Properties();
        // .....  omitted for brevity
        KafkaConsumer<String, String> consumer = new KafkaConsumer(consumerProps);

        StringBuilder largeMessageStringBuffer = new StringBuilder();
        for (int i = 0; i < 1000000; i++) { 
            largeMessageStringBuffer.append("0123456789");
        }
        String largeMessage = largeMessageStringBuffer.toString();
        ProducerRecord<String, String> largeRecord = new ProducerRecord("output-topic", largeMessage);
        ProducerRecord<String, String> normalRecord = new ProducerRecord("output-topic", "normalMessage");


        
		producer.initTransactions();
		consumer.subscribe(Collections.singleton("input-topic"));

        while(true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60));
                if (records.count() > 0) {
                    producer.beginTransaction();
                    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap();
                    for (ConsumerRecord<String, String> record : records) {
						Future<RecordMetadata> sendOutput;

						if (someMethod(record)) {
							sendOutput = producer.send(largeRecord);
						} else {
						    sendOutput = producer.send(normalRecord);
						}

						if (!sendOutput instanceof FutureFailure) {
                        	offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); 
						}                  
				    }

                    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
                    producer.commitTransaction();
                }
            } catch (Exception e) {
                producer.abortTransaction();
                throw new RuntimeException(e);
            }
        }
    }
	public static class MyProducerExceptionHandler implements ProducerExceptionHandler {
        @Override
        public ProducerExceptionHandler.NonRetriableResponse handle(ProducerRecord producerRecord, RecordTooLargeExceptionException e) {
            return ProducerExceptionHandler.NonRetriableResponse.SWALLOW;
	    }
         
        @Override
        public void configure(Map<String, ?> map) {

        }
        @Override
        public void close() throws IOException {

        }
   }
}


// Example 2: UnknownTopicOrPartitionException use case 
// In this example we except that if the record does NOT belong to the "important-topic", the producer follows the custom handler and fails.

public class Example2 {  
	public static void main(String[] args) {
        
		Properties producerProps = new Properties();
 	    producerProps.put("custom.exception.handler.class", Example2.MyProducerExceptionHandler.class.getName());
        // .....  omitted for brevity
        KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

		producer.send(new ProducerRecord(someMethodToComputeTopic(), "someMessage"));
		producer.flush();
		producer.close();
       
    }
	public static class MyProducerExceptionHandler implements ProducerExceptionHandler {
        @Override
        public ProducerExceptionHandler.RetriableResponse handle(ProducerRecord producerRecord, UnknownTopicOrPartitionException e) {
			if (producerRecord.topic().equals("important-topic") {
     			ProducerExceptionHandler.RetriableResponse.RETRY;
			}
            return ProducerExceptionHandler.RetriableResponse.FAIL;
        }
                
        @Override
        public void configure(Map<String, ?> map) {

        }
        @Override
        public void close() throws IOException {

        }
   }
}


Compatibility, Deprecation, and Migration Plan

Changed behaviour: The default behaviour stays as it is, but the user can change the behaviour by implementing the handle() functions as well as setting the two newly introduced config parameters.

Test Plan

Some unit and integration tests will be implemented to ensure that

  • exceptions are caught by the ProducerExceptionHandler and the provided implementations.
  • the right exceptions are caught.

No unit tests are needed to ensure the backward compatibility. Passing the current unit tests is an enough indicator.

Rejected Alternatives

Using one or more producer configs instead of having a pluggable interface: misusing produce configs has the same drawbacks of misusing the interface while the interface solution provides a handler with the advantage of full flexibility. Using the handler interface, the user is able to implement different behaviours for a specific exception. Additionally, further KIPs can be proposed to cover more exceptions or more actions for handing. 



  • No labels