Status
Current state: Under Discussion
Discussion thread: Pending...
JIRA: Pending...
Motivation
I believe there are many use cases where List Serde could be used. Ex. https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows, https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.
Public Interfaces
- New class
org.apache.kafka.common.serialization.ListSerializer
which implements theSerializer<List<T>>
interface - New class
org.apache.kafka.common.serialization.ListDeserializer
which implementsDeserializer<List<T>>
interface - New subclass
ListSerde
inorg.apache.kafka.common.serialization.Serde
s which creates new serde based onListSerializer
andListDeserializer
classes
P.S. Static method corresponding to ListSerde under org.apache.kafka.common.serialization.Serdes (something like static public Serde<List<T>> List() {...} in org.apache.kafka.common.serialization.Serdes) class cannot be added because type needs to be defined beforehand. That's why one needs to create List Serde in the following fashion:
new Serdes.ListSerde<String>(Serdes.String(), Comparator.comparing(String::length));
(can possibly be simplified by declaring import static org.apache.kafka.common.serialization.Serde
s.ListSerde)
Proposed Changes
This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List<T> Serde directly from Consumers, Producers and Streams.
List<T> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde. For example, if you want to create List of Bytes serde, you will have to declare `new Serdes.ListSerde<>(Serdes.String(), Comparator.comparing(String::length))`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List<String>`.
Compatibility, Deprecation, and Migration Plan
Does not apply
Rejected Alternatives
Not known