Status
Current state: Accepted
Discussion thread: https://lists.apache.org/thread/tbhmkf44jhjf8lqmo7w2whynbgttf1o6
Voting thread: https://lists.apache.org/thread/z6v4qyhgydl1tj0s3ycn6v4hv408gx2t
PR: https://github.com/apache/kafka/pull/12545
JIRA: - KAFKA-14944Getting issue details... STATUS
Motivation
Currently we use Deserializer#deserialize(String topic, Headers headers, byte[] data) in CompletedFetch#parseRecord(TopicPartition, RecordBatch, Record) to deserialize key&value, we first call Utils.toArray(ByteBuffer) to convert ByteBuffer into byte[] and then call Deserializer#deserialize(String topic, Headers headers, byte[] data) which will cause memory allocation and memory copying.
The default implementation of this method would still call Utils.toNullableArray(ByteBuffer) and then leverage on the existing method. But for the following cases we can use ByteBuffer instead of byte[] for deserialization, which will reduce memory allocation and memory copying:
- For built-in StringDeserializer and ByteBufferDeserializer, we will do the deserialization directly on with the overloaded method with ByteBuffer.
- If user-customized Deserializers override this overloaded method's default implementation, they also can reduce memory allocation and memory copying.
Public Interfaces
We propose adding default method Deserializer#deserialize(String, Headers, ByteBuffer).
Class | Method |
---|---|
Deserializer | default T deserialize(String topic, Headers headers, ByteBuffer data) { |
ByteBufferDeserializer | @Override |
StringDeserializer | @Override |
Proposed Changes
Deserializer
add default methoddeserialize(String, Headers, ByteBuffer)
;- Invoke
Deserializer#deserialize(String, Headers, ByteBuffer)
instead ofDeserializer#deserialize(String, Headers, byte[])
inCompletedFetch#parseRecord(TopicPartition, RecordBatch, Record)
.
Compatibility, Deprecation, and Migration Plan
This proposal technically introduces a semantic change, as application code receiving a
ByteBuffer
result cannot make any assumptions about position, limit, offset etc any longer. However, no guarantees where ever provided so far, and thus correct user could would still work (user code which does make assumptions about the ByteBuffer is incorrect to begin with, so while it might break, it's a bug at the application layer rather than theByterBufferDeserializer
).- If someone wants the deserializer to be compatible with older versions of the kafka-clients library they should use
ByterArrayDeserializer
instead.
Rejected Alternatives
Another solution I thought of is PoolArea, just like Netty, but this solution is more complicated.