DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
JIRA:
KAFKA-5761
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP intends to open the door to implementing more effective memory management on the Kafka Serializer. Kafka Serializer converts data to a temporary byte[] , which is filled into a producer or socket buffer. The type of byte array, currently, in Kafka Serializer is pure byte[] , so it is hard to reuse the existing byte array for the new messages. Hence, we should introduce a new API to the Serializer that enables users to mark the start and end of available bytes in the byte array.
The new response type introduced Serializer.serialize is ByteBuffer since it comes from the Java standard library and is common in NIO code.
The new API's default implementation is built on top of existing methods to ensure binary compatibility. The implementation enables a smooth transition path, allowing users to choose between implementing the existing byte array-based methods or adopting the new ByteBuffer API based on their specific needs. This flexibility enables gradual adoption while maintaining compatibility with existing code. The goal is to provide better memory management capabilities through the new ByteBuffer-based API while continuing to support current use cases.
Apart from Serializer, Partitioner also needs some updates since it still accepts byte[]. The proposed changes are similar to the Serializer that we replaced byte[] with ByteBuffer . Also, the new API gets a default implementation based on an existing method.
Moreover, this proposal aligns with KIP-863: Reduce CompletedFetch#parseRecord() memory copy, which has already introduced a ByteBuffer-based API for Deserializer. This significant alignment ensures consistent memory optimization across the serialization and deserialization APIs. The successful implementation of ByteBuffer in KIP-863 provides proven patterns that can guide this Serializer improvement while ensuring backward compatibility.
Public Interfaces
org.apache.kafka.common.serialization.Serializer
/**
* Serializes the given data into a ByteBuffer.
*
* @param topic the topic name associated with the data
* @param data typed data; may be {@code null}
* @param headers the headers associated with the record
* @return a ByteBuffer containing the serialized data, ready for reading (position=0, limit set to the end of valid data).
* May return null if the data is null.
* Note: The returned ByteBuffer has already been flipped and is directly readable.
* Implementers should ensure the returned ByteBuffer is in a readable state.
*/
default ByteBuffer serialize(String topic, T data, Headers headers) {
byte[] array = serialize(topic, headers, data);
return array == null ? null : ByteBuffer.wrap(array);
}
org.apache.kafka.clients.producer.Partitioner
// add new method
default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) {
return partition(topic, key, Utils.toNullableArray(keyBytes), value, Utils.toNullableArray(valueBytes), cluster);
}
Proposed Changes
This KIP proposes to add two new methods:
Serialized#serialize(String, T, Headers)is a new method. The order of arguments is different fromSerialized#serialize(String, Headers, T)to keep the same method name.Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster)is a new method.
After adding the new method, we need to modify the default serializer implementation to override the new Serialized#serialize(String, T, Headers) method.
Compatibility, Deprecation, and Migration Plan
This KIP doesn't propose to deprecate any existing methods for the following reasons:
- Deprecation signals intent to eventually remove the API, which would cause unnecessary concern among the large existing user base. The byte array interface will remain valuable for many use cases.
- Numerous existing frameworks, tools, and libraries in the Kafka ecosystem directly depend on the current byte array-based serialization. Deprecating would create ecosystem-wide migration pressure without sufficient benefit.
- Some users may have custom-optimized implementations built directly on byte arrays. Keeping both APIs allows Kafka to support diverse implementation strategies across different use cases.
Instead, all new APIs have a default implementation, ensuring that binary compatibility is not compromised. This implementation strategy offers flexibility in adoption, allowing users to continue using their existing ByteArray-based code or transition to the new ByteBuffer API according to their project's needs and timelines. By taking this approach, we ensure a smooth and gradual migration path while preserving system stability.
Rejected Alternatives
None