Current state: Adopted
Discussion thread: Link
JIRA:
I believe there are many use cases where List Serde could be useful:
For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection.
This KIP proposes adding new ListSerializer and ListDeserializer classes as well as support for the new ListSerde nested class inside the Serdes class. This will allow using Serde<List<Inner>> directly from Consumers, Producers and Streams.
Serde<List<Inner>> serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic Inner's serde. For example, if you want to create List of Strings serde, then serializer/deserializer of Serdes.StringSerde will be used to serialize/deserialize each entry in List<String>.
List serde is an unusual type of serde because we need to consider two things here: the implementation of List interface(i.e. ArrayList, LinkedList, etc) as well as its enclosed elements' type.
First, we need to specify that we are going to use a list serde:
default.key/value.serde = org.apache.kafka.common.serialization.Serdes$ListSerde
Then, we need to introduce two brand new configurations and here I'm proposing these four extra properties:
CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_TYPE_CLASS = "default.list.key.serde.type"
Ex. default.list.key/value.serde.type = java.util.ArrayList
CommonClientConfigs.class: DEFAULT_LIST_KEY/VALUE_SERDE_INNER_CLASS = "default.list.key.serde.inner"
Ex. default.list.key/value.serde.inner = org.apache.kafka.common.serialization.Serdes$IntegerSerde
P.S. Properties default.list.key/value.* will be ignored as long as default.key/value.serde is not set to org.apache.kafka.common.serialization.Serdes$ListSerde
For the performance purposes the following serialization strategies were put in place:
enum SerializationStrategy {
CONSTANT_SIZE,
VARIABLE_SIZE;
}
Depending on the type of an inner serde (a list's element type) the serialization will be performed in the following ways:
Additionally, there are two different ways of serializing NULL values within the payload:
CONSTANT_SIZE VARIABLE_SIZE +----------------------------+ +----------------------------+ | SerializationStrategy | | SerializationStrategy | | Flag | | Flag | |----------------------------| |----------------------------| | NullIndexList.size() | | PayloadList.size() | |----------------------------| |----------------------------| | Null index 1 | | Size of entry 1 | |----------------------------| |----------------------------| | Null index 2 | | | |----------------------------| | Entry 1 | | ... | | | |----------------------------| |----------------------------| | PayloadList.size() | | Size of entry 2 | |----------------------------| |----------------------------| | | | | | Entry 1 | | Entry 1 | | | | | - |----------------------------| |----------------------------| | | | | | Entry 2 | | | | | | | |----------------------------| | | | | | | | | | ... | | | | | | | | | | ... | | | | | | | | | | | | | | | | | | | | | | | +----------------------------+ +----------------------------+
Does not apply
Not known