Status

Current stateUnder Discussion 

Discussion thread: here

JIRA: KAFKA-5761 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP intends to open the door to implementing more effective memory management on Kafka Serializer. Kafka Serializer is used to convert data to a temporary byte[]  which is filled into a producer/socket buffer. The type of byte array, currently, in Kafka Serializer is pure byte[]  so it is hard to reuse the existing byte array for the new messages. Hence, we should introduce a new API to Serializer that enables users to mark the start/end of available bytes in the byte array.

The new response type introduced Serializer.serialize  is ByteBuffer since it comes from the Java standard library and is common in NIO code.

The new API's default implementation is built on top of existing methods to ensure binary compatibility. The implementation allows for a smooth transition path - users can choose to implement either the existing byte array-based methods or adopt the new ByteBuffer API based on their needs. This flexibility enables gradual adoption while maintaining compatibility with existing code. The goal is to provide better memory management capabilities through the new ByteBuffer-based API while continuing to support current use cases.

Apart from Serializer, Partitioner also needs some updates since it still accepts byte[].  The proposed changes are similar to Serializer that we replaced byte[] with ByteBuffer . Also, the new API gets default implementation based on an existing method.

Moreover, this proposal aligns with KIP-863: Reduce CompletedFetch#parseRecord() memory copy, which has already introduced ByteBuffer-based API for Deserializer. This alignment is significant as it promotes consistent memory optimization across the serialization/deserialization api. The successful implementation of ByteBuffer in KIP-863 provides proven patterns that can guide this Serializer improvement while ensuring maintaining backward compatibility.

Public Interfaces

org.apache.kafka.common.serialization.Serializer

 	// add new method
    default ByteBuffer serialize(String topic, T data, Headers headers) {
        byte[] array = serialize(topic, headers, data);
        return array == null ? null : ByteBuffer.wrap(array);
    } 

org.apache.kafka.clients.producer.Partitioner


	// add new method
    default int partition(String topic, Object key, ByteBuffer keyBytes, Object value, ByteBuffer valueBytes, Cluster cluster) {
        return partition(topic, key, Utils.toNullableArray(keyBytes), value, Utils.toNullableArray(valueBytes), cluster);
    }


Proposed Changes


  1. Serialized#serialize(String, T, Headers)  is a new method. The order of arguments is different from Serialized#serialize(String, Headers, T)  to keep the same method name.
  2. Partitioner#partition(String, Object, ByteBuffer, Object, ByteBuffer, Cluster)  is a new method.

After adding the new method, we need to modify the default serializer implementation to override the new Serialized#serialize(String, T, Headers) method. 

Compatibility, Deprecation, and Migration Plan

All new APIs have default implementation so binary compatibility is not broken and the implementation strategy provides flexibility in adoption - users can continue using their existing ByteArray-based code or transition to the new ByteBuffer API according to their project needs and timelines. This approach ensures a smooth and gradual migration path while preserving system stability.

Rejected Alternatives

None

  • No labels