This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-42: Add Producer and Consumer Interceptors

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3162
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3196
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3303

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

 

Code Block
languagejava
firstline1
titleProducerInterceptor
linenumberstrue
/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     * @return record that is either original record passed to this method or new record with modified key and value.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
  
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned. If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object. The implication of interceptors modifying a key in onSend() method is that partition will be assigned based on modified key, not the key from the client. If key/value transformation is not consistent (same key and value does not mutate to the same, but modified, key/value), then log compaction would not work. We will document this in ProducerInterceptor class. However, known use-cases, such as adding app name, host name to a message will do consistent transformation.

Another implication of onSend() returning ProducerRecord is that the interceptor can potentially modify topic/partition. It will be up to the interceptor that ProducerRecord returned from onSend() is correct (e.g. topic and partition, if given, are preserved or modified). KafkaProducer will use ProducerRecord returned from onSend() instead of record passed into KafkaProducer.send() method.

Since there may be multiple interceptors, the first interceptor will get a record from client passed as the 'record' parameter. The next interceptor in the list will get the record returned by the previous interceptor, and so on. Since interceptors are allowed to mutate records, interceptors may potentially get the record already modified by other interceptors. However, we will state in the javadoc that building a pipeline of mutable interceptors that depend on the output of the previous interceptors is discouraged, because of potential side-effects caused by interceptors potentially failing to mutate the record and throwing and exception. If one of the interceptors in the list throws an exception from onSend(), the exception is caught, logged,and the next interceptor is called with the record returned by the last successful interceptor in the list, or otherwise the client.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called. In addition, onAcknowledgement() will be called just before KafkaProducer.send() throws an exception (even when it does not call user callback). The difference in the behavior of ProducerInterceptor.onAcknowledgement() is that if an error occurred, metadata parameter will not be null. In this case, metadata will contain topic and possibly partition information (if available). If partition information is not available, then partition will be assigned -1.

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread. It is up to the interceptor implementation to ensure thread safety. Since onAcknowledgement() is called on producer I/O thread, onAcknowledgement() implementation should be reasonably fast, or otherwise sending of messages from other threads could be delayed.

...