Status

Current state:  Under Discussion 

Discussion threadhere 

JIRAKAFKA-4208

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.

In its current state Kafka does not support the ability to have headers natively in its message/record format.

Examples where having separate supported custom headers becomes useful (this is not an exhaustive list).

Kafka currently has Record<K, V> structure which originally could be used to follow this semantic where by K could contain the headers information, and the V could be the payload.

This issue has been flagged by many people over the past period in forums.

Further details and a more detailed case for headers can be seen here : A Case for Kafka Headers

Public Interfaces

This KIP has the following public interface changes:

  1. Add a new headers length and value (byte[]) to the core message format.
  2. Create a Headers class 

    1. Add a headers (MultiMap<String, Object>) field to Headers class

    2. Add accessor methods on the Headers class - void put<T>(String) and a Collection<Object> get(String)
  3. Add a headers field to ProducerRecord and ConsumerRecord. 

  4. Add accessor methods on the Producer/ConsumerRecord Headers getHeaders()
    1. Add ProduceRequest/ProduceResponse V4 which uses the new message format.
  5. Add FetchRequest/FetchResponse V4 which uses the new message format.
  6. The serialisation of the [String, byte[]] header array will on the wire using a strict format
  7. Each headers value will be custom serialisable by the interceptors/plugins that use the header.

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

There are four options proposed before this proposal. This details our proposed solution of Option 1 described here. The other options are in the Rejected Alternatives section.

 

The advantages of this proposal are:

The disadvantage of this proposal is:

Create a Headers Interface and Implementation to encapsulate headers protocol.

This lazy initialises/deserialises, on first method access.

  1. Add a new Typed class
    1. To hold the typed representations

Add a headers field Headers to both ProducerRecord and ConsumerRecord

Wire protocol change - use attribute bit4 as flag for if headers present. and add (optional) header size and field to the message format

The below is for the core Message wire protocol change needed to fit headers into the message.

A key to this is to ensure the headers cause no overhead if not present.

MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 2
    Attributes => int8 <---------------------- Use Bit 4 as boolean flag for if headers present
    Timestamp => int64
    KeyLength => int32
    Key => bytes
    (optional) HeadersLength => int32 <--------------- NEW [optional] size of the byte[] of the serialised headers if headers
    (optional) Headers => bytes <--------------------- NEW [optional] serialised form of the headers Map<int, byte[]>
    ValueLength => int32
    Value => bytes


Wire protocol of the headers bytes (if above mentioned attributes bit flag is true)

The below is for the headers wire protocol.

Headers (bytes) => Array(KeyLength, Key, ValueLength, Value)
  Set =>
	KeyLength => int32 <-----------------NEW size of the byte[] of the serialised key value
	Key => bytes <---------------------- NEW serialised string (UTF8) bytes of the header key
    ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value
    Value => bytes <-------------------- NEW serialised form of the typed header value -> TypeValue interpretation of this below.
 
TypedValue (bytes) => (Type, Value)
	Type => byte <---------------------- 0x00 = boolean
                                         0x01 = byte
                                         0x02 = char
                                         0x03 = short (int16)
                                         0x04 = int (int32)
                                         0x05 = long (int64)
                                         0x06 = float
                                         0x07 = double
                                         0x08 = string
                                         0x09 = byte[]
    Value => bytes <-------------------- NEW byte array holding the corresponding value in byte array form, 
										 0x00-0x002 - null and boolean this will be zero length
                                         

Compatibility, Deprecation, and Migration Plan

Out of Scope

Some additional features/benefits were noticed and discussed on the above but are deemed out of scope and should be tackled by further KIPS.

Rejected Alternatives

Map<Int, byte[]> Headers added to the Producer/ConsumerRecord

The concept is similar to the above proposed but int keys

Map<String, String> Headers added to the ConsumerRecord

The concept is similar to the above proposed but with a few more disadvantages.

ProducerRecord<K, H, V>, ConsumerRecord<K, H, V>

The proposed change is that headers are Map<int, byte[]> only, this alternative is that headers can be of any type denoted by H

Common Value Message Wrapper - Message<V>

This builds on the status quo and addresses some core issues, but fails to address some more advanced and future use cases and also has some compatibility issues for upgrade/clients not supporting.

please see: Headers Value Message Wrapper

 

Status Quo - Keep Custom Value Message Wrapper - Message<H, P>

This concept is the current defacto way many users are having to temporally deal with the situation, but has some core key issues that it does not resolve.