Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this KIP, we propose adding two new interfaces: ProducerInterceptor on producer and ConsumerInterceptor on consumer. User will be able to implement and configure a chain of custom interceptors and listen to events that happen to a record at different points on producer and consumer. Each of the APIs will provide read-only access to a message. (Manipulating records is out-of-scope for this KIP. Interceptor interfaces can be later extended to allow manipulation of records and more intercept points, by proposing new KIPs. See the rejected alternatives section.)

Public Interfaces

Interceptor API will allow mutate the records to support the ability to add metadata to a message for auditing/end-to-end monitoring.

Public Interfaces

We add two new interfaces: ProducerInterceptor interface that will allow plugging in classes that will We add two new interfaces: ProducerInterceptor interface that will allow plugging in classes that will be notified of events happening to the record during its lifetime on the producer; and ConsumerInterceptor interface that will allow plugging in classes that will be notified of record events on the consumer.   ProducerInterceptor API will allow to modify keys and values pre-serialization. For symmetry, ConsumerInterceptor API will allow to modify keys and values post-deserialization.

Both ProducerInterceptor and ConsumerInterceptor inherit from Configurable. Properties Both ProducerInterceptor and ConsumerInterceptor inherit from Configurable. Properties passed to configure() method will be consumer/producer config properties (including clientId if it was not specified in the config and assigned by KafkaProducer/KafkaConsumer). We will document in the Producer/ConsumerInterceptor class description that they will be sharing producer/consumer config namespace possibly with many other interceptors and serializers. So, it could be useful to use a prefix to prevent conflicts.

...

ProducerInterceptor interface

 

1
Code Block
2
language
3
java
4
firstline
5
1
6
title
7
ProducerInterceptor
8
linenumbers
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     */
    public void onSend(ProducerRecord<K, V> record);
 
    /**
     * This is called just after KafkaProducer assigns partition (if needed) and serializes key and value.
     * @param tp topic/partition to send record to
     * @param record the record from client
     * @param serializedKeyValue serialized key and value
     */
    public void onEnqueued(TopicPartition tp, ProducerRecord<K, V> record, SerializedKeyValue serializedKeyValue);
 
    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

 

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned. 

...

true
/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     * @return record that is either original record passed to this method or new record with modified key and value.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error occurred.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned. If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object. The implication of interceptors modifying a key in onSend() method is that partition will be assigned based on modified key, not the key from the client. If key/value transformation is not consistent (same key and value does not mutate to the same, but modified, key/value), then log compaction would not work. We will document this in ProducerInterceptor class. However, known use-cases, such as adding app name, host name to a message will do consistent transformation.

Another implication of onSend() returning ProducerRecord is that the interceptor can potentially modify topic/partition. It will be up to the interceptor that ProducerRecord returned from onSend() is correct (e.g. topic and partition, if given, are preserved or modified). KafkaProducer will use ProducerRecord returned from onSend() instead of record passed into KafkaProducer.send() method.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.

...

onCompletion(), and is called just before Callback.onCompletion() is called.

...

In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback).

 ProducerInterceptor APIs will be called from multiple threads: onSend() and onEnqueued() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.

ConsumerInterceptor interface 

 

1
Code Block
2
language
3
java
4
firstline
5
1
6
title
7
ConsumerInterceptor
8
linenumbers
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
 * by a client.
 */
public interface ConsumerInterceptor<K, V> extends Configurable {
    /**
     * This is called on fetch of a record, before key and value gets de-serialized
     * @param tp topic/partition of a record
     * @param serializedKeyValue serialized key and value fetched from broker
     */
    public void onReceive(TopicPartition tp, SerializedKeyValue serializedKeyValue);
 
    /**
     * This is called when the records are about to be returned to the client.
     * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
     */
    public void onConsume(ConsumerRecords<K, V> records);
  
    /**
     * This is called when offsets get committed
     * This method will be called when the commit request sent to the server has been acknowledged.
     * @param offsets A map of the offsets and associated metadata that this callback applies to
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

 

onReceive() will be called on fetch of a record: in Fetcher.parseRecord() just before key and value gets de-serialized. 

true
/**
 * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
 * by a client.
 */
public interface ConsumerInterceptor<K, V> extends Configurable { 
    /**
     * This is called when the records are about to be returned to the client.
     * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
     * @return records that is either original 'records' passed to this method or modified set of records
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  
    /**
     * This is called when offsets get committed
     * This method will be called when the commit request sent to the server has been acknowledged.
     * @param offsets A map of the offsets and associated metadata that this callback applies to
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

 

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords. The implementation of onConsume() is allowed to modify key and values in ConsumerRecords, and if so, return them in new ConsumerRecords. This method is designed to be symmetric to ProducerInterceptor.onSend() and provides a way to undo a transformation done in an onSend producer interceptor.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync()  onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread. Since interceptor callbacks are called for every record, the interceptor implementation should be careful about adding performance overhead to consumer.

...

  • We will create a new class that will encapsulate a list of ProducerInterceptor instances: ProducerInterceptors

     

    1
    Code Block
    2
    language
    3
    java
    4
    firstline
    5
    1
    6
    title
    7
    ProducerInterceptors
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    linenumberstrue
    /**
     * This class wraps custom interceptors configured for this producer.
     */
    public class ProducerInterceptors<K, V> implements Closeable {
        private final List<ProducerInterceptor<K,V>> interceptors;
     
        public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
      
        public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
            ProducerRecord<K, V> interceptRecord = record;
            for (ProducerInterceptor interceptor: this.interceptors) {
                interceptRecord = interceptor.onSend(interceptRecord);
            }
            return interceptRecord;
        }
     
        public void onAcknowledgement(RecordMetadata metadata, Exception e) {
            for (ProducerInterceptor interceptor: this.interceptors) {
                interceptor.onAcknowledgement(metadata, e);
            }
        }
     
        @Override
        public void close() {
            for (ProducerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.close();
            }       
        }
    
    /**
     * This class wraps custom interceptors configured for this producer.
     */
    public class ProducerInterceptors<K, V> implements Closeable {
        private final List<ProducerInterceptor<K,V>> interceptors;
     
        public ProducerInterceptors(List<ProducerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
      
        public void onSend(ProducerRecord<K, V> record) {
            for (ProducerInterceptor interceptor: this.interceptors) {
                interceptor.onSend(record);
            }
        }
     
        public void onEnqueued(TopicPartition tp, ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue) {
            for (ProducerInterceptor interceptor: this.interceptors) {
                interceptor.onEnqueued(tp, record, serializedKeyValue);
            }
        }
     
        public void onAcknowledgement(RecordMetadata metadata, Exception e) {
            for (ProducerInterceptor interceptor: this.interceptors) {
                interceptor.onAcknowledgement(metadata, e);
            }
        }
     
        @Override
        public void close() {
            for (ProducerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.close();
            }       
        }
    }
  • KafkaProducer will have a new member:
    • ProducerInterceptors<K, V> interceptors;
  • KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ProducerConfig.originalsWithPrefixoriginals(ProducerInterceptor.prefix()) and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
  • To be able to call interceptor on producer callback, we wrap client callback passed to KafkaProducer.send() method inside ProducerCallback – a new class that inherits Callback and will have a reference to client callback and 'interceptors'. ProducerCallback.onCompletion() implementation will call client's callback onCompletion (if client's callback is not null) and will call 'interceptors' onAcknowledgement().

    ProducerCallback

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
     * This class is a callback called on every producer request complete.
     */
    public class ProducerCallback<K, V> implements Callback {
        private final Callback clientCallback;
        private final ProducerInterceptors<K, V> interceptors;
     
        public ProducerCallback(Callback clientCallback, ProducerInterceptors<K, V> interceptors) {
            this.clientCallback = clientCallback;
            this.interceptors = interceptors;
        }
     
        public void onCompletion(RecordMetadata metadata, Exception e) {
            interceptors.onAcknowledgement(metadata, e);
            if (clientCallback != null)
                clientCallback.onCompletion(metadata, e);
        }
    }
  • KafkaProducer.send() will create ProducerCallback and call onSend() method.

    producerCallback = new ProducerCallback(callback, this.interceptors);
    ProducerRecord<K, V> sendRecord = interceptors.onSend(record);
    In
  • The rest of KafkaProducer.send() code will use sendRecord in place of 'record'.
  • KafkaProducer.close() , after partition is assigned and keys and values are serialized, and before accumulator.append() call.
    interceptors.onEnqueued(tp, record, serializedKey, serializedValue);
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, producerCallback, remainingWaitMs);
    KafkaProducer.close() will close interceptors:
    ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);

...

Kafka Consumer changes:

  • We will create a new class that will encapsulate a list of ConsumerInterceptor instances: ConsumerInterceptors

     

    1
    Code Block
    2
    language
    3
    java
    4
    firstline
    5
    1
    6
    title
    7
    ConsumerInterceptors
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
     * This class wraps custom interceptors configured for this consumer. On this callback, all consumer interceptors
     * configured for the consumer are called.
     */
    public class ConsumerInterceptors<K, V> implements Closeable {
        private final List<ConsumerInterceptor<K,V>> interceptors;
     
        public ConsumerInterceptors(List<ConsumerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
     
        public void onReceive(TopicPartition tp, byte[] serializedKey, byte[] serializedValue) {
            for (ConsumerInterceptor<K, V> interceptor: this.interceptors) {
                interceptor.onReceive(tp, serializedKeyValue);
            }
        }
     
        public void onConsume(ConsumerRecords<K, V> records) {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.onConsume(records);
            }
        }
      
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.onCommit(offsets);
            }
        }
     
        @Override
        public void close() {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.close();
            }       
        }
    }
    linenumberstrue
    /**
     * This class wraps custom interceptors configured for this consumer. On this callback, all consumer interceptors
     * configured for the consumer are called.
     */
    public class ConsumerInterceptors<K, V> implements Closeable {
        private final List<ConsumerInterceptor<K,V>> interceptors;
     
        public ConsumerInterceptors(List<ConsumerInterceptor<K,V>> interceptors) {
            this.interceptors = interceptors;
        }
     
        public void onConsume(ConsumerRecords<K, V> records) {
            ConsumerRecords<K, V> interceptRecords = records;
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptRecords = interceptor.onConsume(interceptRecords);
            }
            return interceptRecords;
        }
      
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.onCommit(offsets);
            }
        }
     
        @Override
        public void close() {
            for (ConsumerInterceptor<K,V> interceptor: this.interceptors) {
                interceptor.close();
            }       
        }
    }
  • KafkaConsumer will KafkaConsumer will have a new member
    ConsumerInterceptors<K, V> interceptors;
  • KafkaConsumer constructor will load instances of interceptor classes specified in interceptor.classes. If interceptor.classes config does not list any interceptor classes, interceptors list will be empty. It will call configure() on each interceptor class, passing in ConsumerConfig.originalsWithPrefix(ConsumerInterceptor.prefixoriginals()) and clientId. KafkaConsumer constructor will instantiate 'interceptors' with a list of interceptor classes.
  • KafkaConsumer.close() will close 'interceptors' will also be passed to Fetcher constructor, and Fetcher will have a reference to 'interceptors'.:
    KafkaConsumer.close() will close 'interceptors':
    ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
  • KafkaConsumer.poll will call 
    this.interceptors.onConsume(consumerRecords);
    Fetcher.parseRecords will be modified as follows;
    byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
    byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
    interceptors.onReceive(partition, keyByteArray, valueByteArray);
    K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
    V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(),valueByteArray);
    return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, key, value);
    ...

Compatibility, Deprecation, and Migration Plan

It will not impact any of existing clients. When clients upgrade to new version, they do not need to add interceptor.classes config.

Rejected Alternatives

Alternative 1 - Interceptor interfaces on the broker

This KIP proposes interceptors only on producers and consumers. Adding message interceptor on the broker makes a lot of sense, and will add more detail to monitoring. However, the proposal is to do it later in a separate KIP for the following reasons:

  • Broker interceptors are more risky because brokers are more sensitive to overheads that could be added by interceptors. Added performance overhead on brokers would affect all clients.
  • Producer and consumer interceptors are less risky, and give us good risk vs. reward tradeoff, since producer and consumer interceptors alone will enable end-to-end monitoring.
  • As a result, it is better to start with producer and consumer interceptors and gains experience to see how usable they are.
  • Once we see usability from experience with producer and consumer interceptors, we can create a broker interceptor KIP, which will allow us to have a more complete/detailed message monitoring.

Alternative 2 – Interceptor interfaces that allow mutating the data

...

  • and return ConsumerRecords<K, V> returned from onConsume().

Compatibility, Deprecation, and Migration Plan

It will not impact any of existing clients. When clients upgrade to new version, they do not need to add interceptor.classes config.

Future compatibility. When/if new methods will be added to ProducerInterceptor and ConsumerInterceptor (as part of other KIP(s)), they will be added with an empty implementation to the Producer/ConsumerInterceptor interfaces. This is a new feature in Java 8. 

Rejected Alternatives

Alternative 1 - Interceptor interfaces on the broker

This KIP proposes interceptors only on producers and consumers. Adding message interceptor on the broker makes a lot of sense, and will add more detail to monitoring. However, the proposal is to do it later in a separate KIP for the following reasons:

  • Broker interceptors are more risky because brokers are more sensitive to overheads that could be added by interceptors. Added performance overhead on brokers would affect all clients.
  • Producer and consumer interceptors are less risky, and give us good risk vs. reward tradeoff, since producer and consumer interceptors alone will enable end-to-end monitoring.
  • As a result, it is better to start with producer and consumer interceptors and gains experience to see how usable they are.
  • Once we see usability from experience with producer and consumer interceptors, we can create a broker interceptor KIP, which will allow us to have a more complete/detailed message monitoring.

Alternative 2 – Interceptor callbacks that expose internal implementation of producer/consumer

The producer and consumer interceptor callbacks proposed in this KIP are fundamental aspects of producer and consumer protocol, and they don't depend on implementation of producer and consumer. In addition to the proposed methods, it may be useful to add more hooks such as ProducerInterceptor.onEnqueue (called before adding serialized key and value to the accumulator) or producerInterceptor.onDequeue(). They can be useful, but have disadvantage of exposing internal implementation. This can be limiting as changing internal implementation in the future may require changing the interfaces.

We can add some of these methods later if we find concrete use-cases for them. For the use-cases raised so far, it was not clear whether they should be implemented by interceptors or by other means. Examples:

  • Use onEnqueue() and onDequeue() methods to measure fine-grain latency, such as serialization latency or time records spend in the accumulator. However, the insights into these latencies could be provided by Kafka Metrics.
  • Encryption. There are several design options here. One is per-record encryption which would require adding ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One could argue that in that case encryption could be done by adding a custom serializer/deserializer. Another option is to do encryption after message gets compressed, but there are issues that arise regarding broker doing re-compression. Thus, it is not clear yet whether interceptors are the right approach for adding encryption. 

Alternative 3 – Wrapper around KafkaProducer and KafkaConsumer.

...