Status

Current state: Accepted

Discussion thread:2024-07, 2019-08

Voting Thread: 2024-08 , 2019-09 

JIRA:  KAFKA-8830

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

(The initial version of KIP-512 was created in 2019. The current version, resurrected in 2024, includes more details about WHY.)

Currently, the topic, partition, offset, and timestamp are available in the Producer Interceptor onAcknowledgement  method. However, headers are not accessible.

There are two primary use cases that highlight the importance of making headers available in the onAcknowledgement  method:

  1. Latency Measurement
    Latency measurement is crucial for understanding the time messages travel from the producer to Kafka and back to the producer as an acknowledgment. The current setup does NOT allow for precise measurement of the producer-side latency (a) in the following scenario:

    producer send -> (a) -> Kafka -> (b) -> acknowledge


    1. If using CreateTime, the calculation is: now - message.timestamp = (a) + (b) 
    2. If using LogAppendTime, the calculation is: now - message.timestamp = (b)

      By making headers available in onAcknowledgement, we can include a timestamp in the header when the message is sent. This allows us to calculate the producer-side latency (a) as message.timestamp - CreateTimeFromHeader  when using LogAppendTime.
  2. Tracing Completeness
    In distributed systems, tracing is essential for tracking the flow of messages and understanding system behavior. The trace ID is typically stored in the message headers. Having access to headers in the onAcknowledgement method would enable us to add spans indicating when the message arrives at Kafka and when it is acknowledged by the client. This would significantly enhance tracing completeness and accuracy.


From the original discussion thread (2019), there was an argument about the necessity of this feature since headers are already available in the producer. I understand this point, but I would like to explain two scenarios:

  1. When the producer is accessible:  
    While it is technically possible for application code to keep a copy of Headers and link them back in the interceptor, this approach is complicated and cumbersome. Allowing the producer to pass the header to the interceptor directly would greatly improve user experience and ease of use.
  2. When the producer is not accessible
    In some cases, such as with Debezium and Flink Kafka sink, the Callback is not injectable, making it impractical to use this method for the aforementioned purposes. I believe that improving the Kafka client itself is the correct approach, rather than patching Debezium, Flink, or other producers.

Public Interfaces

New onAcknowledgement  and onCompletion  interface with headers as parameters.

onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers)


Proposed Change

ProducerInterceptor

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
    // EXISTING, but add default implementation with no op
    default void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
    // NEW
    default void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
        onAcknowledgement(metadata, exception);
    }
}



KafkaProducer

public class KafkaProducer<K, V> implements Producer<K, V> {
    private class AppendCallbacks implements RecordAccumulator.AppendCallbacks {
        private final Headers headers; // <-- NEW

        private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
            this.userCallback = userCallback;
            this.interceptors = interceptors;
            this.headers = record.headers(); // <-- NEW
        }

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (metadata == null) {
                metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
            }
            this.interceptors.onAcknowledgement(metadata, exception, this.headers); // <-- MODIFIED
            if (this.userCallback != null) {
                this.userCallback.onCompletion(metadata, exception);
             }
        }
    }
}



Migration Plan and Compatibility

No migration is needed, it is backward compatible.

The old interface will not be deprecated. This decision is intended to help reduce the work required for client upgrades.

Alternatives

update RecordMetadata, so onAcknowledgement and onComplete can access headers

public final class RecordMetadata {
    private final Headers headers;

    // EXISTING
    public RecordMetadata(TopicPartition topicPartition, long baseOffset, int batchIndex, long timestamp,
                          int serializedKeySize, int serializedValueSize) {
        this(result, batchIndex, createTimestamp, serializedKeySize, serializedValueSize, time, null);
    }

    // NEW
    public RecordMetadata(TopicPartition topicPartition, long baseOffset, int batchIndex, long timestamp,
                          int serializedKeySize, int serializedValueSize,
                           Headers headers
    ) {
        this.offset = baseOffset == -1 ? baseOffset : baseOffset + batchIndex;
        this.timestamp = timestamp;
        this.serializedKeySize = serializedKeySize;
        this.serializedValueSize = serializedValueSize;
        this.topicPartition = topicPartition;
        this.headers = headers;
    }

    // NEW
    public Headers headers() {
        return this.headers;
    }
}

Reject reason: Headers store "application metadata" of a record, but they are not "Kafka native" record metadata. Headers are a black box to Kafka, similar to key and value.

  • No labels