Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
languagejava
firstline1
titleProducerInterceptor
linenumberstrue
/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     * @return record that is either original record passed to this method or new record with modified key and value.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned. If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object. The implication of interceptors modifying a key in onSend() method is that partition will be assigned based on modified key, not the key from the client. If key/value transformation is not consistent (same key and value does not mutate to the same, but modified, key/value), then log compaction would not work. We will document this in ProducerInterceptor class. However, known use-cases, such as adding app name, host name to a message will do consistent transformation.

Another implication of onSend() returning ProducerRecord is that the interceptor can potentially modify topic/partition. It will be up to the interceptor that ProducerRecord returned from onSend() is correct (e.g. topic and partition, if given, are preserved or modified). KafkaProducer will use ProducerRecord returned from onSend() instead of record passed into KafkaProducer.send() method.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called. In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback).

Since there may be multiple interceptors, the first interceptor will get a record from client passed as the 'record' parameter. The next interceptor in the list will get the record returned by the previous interceptor, and so on. Since interceptors are allowed to mutate records, interceptors may potentially get the record already modified by other interceptors. However, we will state in the javadoc that building a pipeline of mutable interceptors that depend on the output of the previous interceptors is discouraged, because of potential side-effects caused by interceptors potentially failing to mutate the record and throwing and exception. If one of the interceptors in the list throws an exception from onSend(), the exception is caught, logged,and the next interceptor is called with the record returned by the last successful interceptor in the list, or otherwise the client.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called. In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback).

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.

...

 

Code Block
languagejava
firstline1
titleConsumerInterceptor
linenumberstrue
/**
 * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
 * by a client.
 */
public interface ConsumerInterceptor<K, V> extends Configurable { 
    /**
     * This is called when the records are about to be returned to the client.
     * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
     * @return records that is either original 'records' passed to this method or modified set of records
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  
    /**
     * This is called when offsets get committed
     * This method will be called when the commit request sent to the server has been acknowledged.
     * @param offsets A map of the offsets and associated metadata that this callback applies to
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

 

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords. The implementation of onConsume() is allowed to modify key and values in ConsumerRecords, and if so, return them in new ConsumerRecords. This method is designed to be symmetric to ProducerInterceptor.onSend() and provides a way to undo a transformation done in an onSend producer interceptor. The records returned from onConsume will be returned to the user from KafkaConsumer.poll(). Thus, the implication of this callback is that the interceptors can potentially modify topic, partition, offset of the record. We will clearly document this is up to the interceptor to make sure that topic, partition, and offset returned in ConsumerRecords are valid.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.

Add more record metadata to RecordMetadata and ConsumerRecord

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32). We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit. Checksum provides an easy way to get a summary of the message and is also useful for validating a message end-to-end. For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

...

languagejava
firstline1
titleRecordMetadata
linenumberstrue

...


Since there may be multiple interceptors, the first interceptor will get records consumed by the consumer. The next interceptor in the list will get the records returned by the previous interceptor, and so on. Since interceptors are allowed to mutate records, interceptors may potentially get the records already modified by other interceptors. However, we will state in the javadoc that building a pipeline of mutable interceptors that depend on the output of the previous interceptors is discouraged, because of potential side-effects caused by interceptors potentially failing to mutate the records and throwing and exception. If one of the interceptors in the list throws an exception from onConsume(), the exception is caught, logged,and the next interceptor is called with the records returned by the last successful interceptor in the list, or otherwise consumed from brokers.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.

Add more record metadata to RecordMetadata and ConsumerRecord

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32). We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit. Checksum provides an easy way to get a summary of the message and is also useful for validating a message end-to-end. For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

Code Block
languagejava
firstline1
titleConsumerRecordRecordMetadata
linenumberstrue
public final class ConsumerRecord<K,RecordMetadata V> {
 .......
    private final long checksumoffset;
    private final TopicPartition topicPartition;
    private final long checksum;                <<== NEW: checksum of the record
    private final int size;                     <<== NEW: record size in bytes (afterbefore decompression)

We will make it clear in the documentation (of ConsumerRecord and onAknowledgement/onConsume) that checksum the consumer sees may not always be the one initially set on the producer. CRC may be overwritten by the broker during upgrade after message format change or in the case of topic config with timestamp type == LogAppendTime, which requires over-writing message timestamps in the message on the broker and as a result overwriting.

Proposed Changes

We propose to add two new interfaces listed and described in the Public Interfaces section: ProducerInterceptor and ConsumerInterceptor. We will allow a chain of interceptors. It is up to the user to correctly specify the order of interceptors in producer.interceptor.classes and in consumer.interceptor.classes.

Kafka Producer changes

compression)
.......
Code Block
languagejava
firstline1
titleConsumerRecord
linenumberstrue
public final class ConsumerRecord<K, V> {
 .......
    private final long checksum;               <<== NEW: checksum of the record
    private final int size;                    <<== NEW: record size in bytes (after decompression)

We will make it clear in the documentation (of ConsumerRecord and onAknowledgement/onConsume) that checksum the consumer sees may not always be the one initially set on the producer. CRC may be overwritten by the broker during upgrade after message format change or in the case of topic config with timestamp type == LogAppendTime, which requires over-writing message timestamps in the message on the broker and as a result overwriting.

Proposed Changes

We propose to add two new interfaces listed and described in the Public Interfaces section: ProducerInterceptor and ConsumerInterceptor. We will allow a chain of interceptors. It is up to the user to correctly specify the order of interceptors in producer.interceptor.classes and in consumer.interceptor.classes.

Kafka Producer changes

  • We will create a new class that will encapsulate a list of ProducerInterceptor instances: ProducerInterceptors

     

    Code Block
    languagejava
    firstline1
    titleProducerInterceptors
    linenumberstrue
    /**
     * This class wraps custom interceptors configured for this producer.
     */
    public class ProducerInterceptors<K, V> implements Closeable {
        private final List<ProducerInterceptor<K,V>> interceptors;
     
        public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
      
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
            ProducerRecord<K, V> interceptRecord = record;
            for (ProducerInterceptor interceptor: this.interceptors) {
    			try {
    			    interceptRecord = interceptor.onSend(interceptRecord);
    			} catch (Throwable t) {
    			    // do not propagate interceptor exception, ignore and continue calling other interceptors
    			    log.warn("Error executing interceptor onSend callback for topic: " + record.topic() + ", partition: " + record.partition(), t);
    			}
            }

    We will create a new class that will encapsulate a list of ProducerInterceptor instances: ProducerInterceptors

     

    Code Block
    languagejava
    firstline1
    titleProducerInterceptors
    linenumberstrue
    /**
     * This class wraps custom interceptors configured for this producer.
     */
    public class ProducerInterceptors<K, V> implements Closeable {
        private final List<ProducerInterceptor<K,V>> interceptors;
     
        public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptorsreturn interceptRecord;
        }
      
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> recordvoid onAcknowledgement(RecordMetadata metadata, Exception e) {
            ProducerRecord<K, V> interceptRecord = record;
            for (ProducerInterceptor		for (ProducerInterceptor<K, V> interceptor: this.interceptors) {
        		try {
           interceptRecord = 		interceptor.onSend(interceptRecordonAcknowledgement(metadata, exception);
        		    }
            return interceptRecord;
        }
     
        public void onAcknowledgement(RecordMetadata metadata, Exception e catch (Throwable t) {
            for (ProducerInterceptor interceptor: this.interceptors) {
      		// do not propagate interceptor exceptions, just ignore
    		        log.warn("Error executing interceptor.onAcknowledgement(metadata onAcknowledgement callback", et);
    		        }
    		}
        }
     
        @Override
        public void close() {
            for (ProducerInterceptor<K,V> interceptor: this.interceptors.interceptors) {
    			try {
    			    interceptor.close();
    			} catch (Throwable t) {
    			    log.error("Failed to close producer interceptor    interceptor.close(", t);
    			}
            }       
        }
    }
  • KafkaProducer will have a new member:
    • ProducerInterceptors<K, V> interceptors;
  • KafkaProducer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ProducerConfig.originals(). KafkaProducerconstructor will instantiate 'interceptors' with a list of interceptor classes.
  • To be able to call interceptor on producer callback, we wrap client callback passed to KafkaProducer.send() method inside ProducerCallback – a new class that inherits Callback and will have a reference to client callback and 'interceptors'. ProducerCallback.onCompletion() implementation will call client's callback onCompletion (if client's callback is not null) and will call 'interceptors' onAcknowledgement().

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
     * This class is a callback called on every producer request complete.
     */
    public class ProducerCallback<K, V> implements Callback {
        private final Callback clientCallback;
        private final ProducerInterceptors<K, V> interceptors;
     
        public ProducerCallback(Callback clientCallback, ProducerInterceptors<K, V> interceptors) {
            this.clientCallback = clientCallback;
            this.interceptors = interceptors;
        }
     
        public void onCompletion(RecordMetadata metadata, Exception e) {
            interceptors.onAcknowledgement(metadata, e);
            if (clientCallback != null)
                clientCallback.onCompletion(metadata, e);
        }
    }
  • KafkaProducer.send() will create ProducerCallback and call onSend() method.

    producerCallback = new ProducerCallback(callback, this.interceptors);
    ProducerRecord<K, V> sentRecord = interceptors.onSend(record);
  • The rest of KafkaProducer.send() code will use sendRecord in place of 'record'.
  • KafkaProducer.close() will close interceptors:
    ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);

...

  • We will create a new class that will encapsulate a list of ConsumerInterceptor instances: ConsumerInterceptors

     

    Code Block
    languagejava
    firstline1
    titleConsumerInterceptors
    linenumberstrue
    /**
     * This class wraps custom interceptors configured for this consumer. On this callback, all consumer interceptors
     * configured for the consumer are called.
     */
    public class ConsumerInterceptors<K, V> implements Closeable {
        private final List<ConsumerInterceptor<K,V>> interceptors;
     
        public ConsumerInterceptors(List<ConsumerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
     
        public void onConsume(ConsumerRecords<K, V> records) {
            ConsumerRecords<K, V> interceptRecords = records;
            for (ConsumerInterceptor<K, V>V> interceptor: this.interceptors) {
    			try {
    			    interceptRecords = recordsinterceptor.onConsume(interceptRecords);
    			} catch (Throwable t) {
       			 // do for (ConsumerInterceptor<K,V>not propagate interceptor: this.interceptors) {
                interceptRecords = interceptor.onConsume(interceptRecords);
     exception, ignore and continue calling other interceptors
      			  log.warn("Error executing interceptor onConsume callback", t);
    			}         
    		}
            return interceptRecords;
        }
      
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
    			try {
    			    interceptor.onCommit(offsets);
    			} catch (Throwable t) {
    			    // do not propagate interceptor exception, just ignore
    			    log.onCommit(offsets);warn("Error executing interceptor onCommit callback", t);
    			}
            }
        }
     
        @Override
        public void close() {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
    			try {
    			    interceptor.close();
    			} catch (Throwable t) {
    			    interceptor.close();log.error("Failed to close consumer interceptor ", t);
    			}
            }       
        }
    }
  • KafkaConsumer will have a new member
    ConsumerInterceptors<K, V> interceptors;
  • KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ConsumerConfig.originals() and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
  • KafkaConsumer.close() will close 'interceptors':
    ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
  • KafkaConsumer.poll will call 
    this.interceptors.onConsume(consumerRecords);

    and return ConsumerRecords<K, V> returned from onConsume().

  • ConsumerCoordinator.commitOffsetsAsync and commitOffsetsSync will call onCommit().

Compatibility, Deprecation, and Migration Plan

...