Motivation
To address the uses cases in KIP-82, an implementation that details the pros and cons and also the changes needed if a message wrapper solution was taken.
This is not the proposed solution, but more an elaboration of an alternative solution, to show more clearly why it was discounted.
Advantages/Disadvantages
The advantages of this option:
- MessageAndOffset is limited in change to just using up one attribute bit for tombstone marker
- Ability to set headers from a client side using a common interface
- Message Headers are kept to client side only implementation
The disadvantage of this option
- Change to the message object
- Client users need to know in advance if message will use headers or not (as in will use the wrapper)
- Headers are more for the platform needs, enforcing a wrapper makes this not invisible to end user code where only platforms need to add,consume headers via interceptors
- Cannot make use of the headers server side whilst only client side implementation
- e.g. Server Side Plugins/Interceptors see A Case for Kafka Headers
- If made server aware, then is better to integrate into the core message properly this is one reason for discounting this over the proposed solution.
- No versioning server side allowing older clients / other language clients to co-exist
- Upgrade compatibility issue - new topics needed to be created
- This is more prevalent for consumers
- Point solution for handling compaction server side
The amount of change needed both client and server side is similar in size to proposed solution in KIP-82 but has more above drawbacks.
Changes
- Creation of a common Kafka Message Wrapper for Java Client
- Interceptors to be made message wrapper aware
- Client users to create the message wrapper inserting their value inside it before creating producer record
- The serialisation of the [int, bye[]] header set will on the wire using a strict format
- Each headers value will be custom serialisable by the interceptors/plugins that use the header.
- As int key based headers for compactness ranges will be reserved for different usages:
- client headers adopted by the kafka open source (not necessary the kafka open source owns it)
- broker headers adopted by the kafka open source (not necessary the kafka open source owns it)
- commercial vendor space
- custom inhouse
- To handle compaction issue
- Update Producer/Consumer record to set tombstone marker
- use an attribute bit 4 as boolean flag to mark if record should be deleted ("tombstone marker")
- Add ProduceRequest/ProduceResponse V4 which uses the new message format.
- Add FetchRequest/FetchResponse V4 which uses the new message format.
LogCleaner
Update method "shouldRetainMessage" to also look at attribute bit 4 for tombstone marker
MessageAndOffset
Wire protocol change
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key HeadersLength Headers ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 2 Attributes => int8 <---------------------- Use Bit 4 as boolean flag for compaction to signify deletion / tombstone Timestamp => int64 KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
Header Value Message Wrapper
MessageWrapper (Java)
public class MessageWrapper<V> { private Map<Integer, byte[]> headers; private V value; public MessageWrapper(Map<Integer, byte[]> headers, V value){ this.headers = headers; this.value = value; } public Map<Integer, byte[]> getHeaders() { return headers; } public V getValue() { return value; } }
Wire protocol of the Message wrapper
MessageWrapper (bytes) => HeadersLength, Headers, ValueLength, Value MagicByte => int8 <------------------- 0 this is used for future versioning HeadersLength => int32 <--------------- size of the byte[] of the serialised headers Headers => bytes <--------------------- serialised form of the headers Map<int, byte[]> ValueLength => int32 <----------------- size of the byte[] of the serialised values Value => bytes <----------------------- byte[] of the value Headers (bytes) => Set(Key, ValueLength, Value) Set => Key => int32 <---------------------- NEW int key of the header ValueLength => int32 <-------------- NEW size of the byte[] of the serialised header value Value => bytes <-------------------- NEW serialised form of the header value
Compatibility, Deprecation, and Migration Plan
- MessageWrapper will not be back compatible
- Current client users will be affected
- new topics will be needed
- will need to instantiate separately and produce the message wrapper.
- Current client users will be affected
- Tombstone feature will be backwards compatible
- Message version migration would be handled as like in KIP-32