...
This KIP is aimed to add Record Headers support as part of Streams API for routing, filtering and mapping. Stateful operations (tables, join) will be out-of-scope.
Public Interfaces
1. Add Headers to `KeyValue` class:
org.apache.kafka.streams.KeyValue.java:
...
public final K key;
...
public final V value;
...
headers
...
2. Add headers as part of processing:
org.apache.kafka.streams.processor.Processor:
...
<K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
<K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner);
<K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final Headers headers);
<K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Headers headers);
32. Add methods to use Headers as part of processing:
...
`KStream<K, V> filter(PredicateWithHeaders<? super K, ? super V> predicate);`
`KStream<K, V> filterNot(PredicateWithHeaders<? super K, ? super V> predicate);`
`<KR> KStream<KR, V> selectKey(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KR> mapper);`
`<KR, VR> KStream<KR, VR> map(KeyValueAndHeadersMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);`
`<KR, VR> KStream<KR, VR> flatMap(final KeyValueAndHeadersMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);`
`void foreach(final ForeachActionWithHeaders<? super K, ? super V> action);`
`KStream<K, V> peek(final ForeachActionWithHeaders<? super K, ? super V> action);`
`KStream<K, V>[] branch(final PredicateWithHeaders<? super K, ? super V>... predicates);`
`<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector);`
`<KR> KGroupedStream<KR, V> groupBy(final KeyValueAndHeadersMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized);`
4. Add Headers when storing data in Streams API:
org.apache.kafka.streams.state.KeyValueStore:
...
void put(K key, V value);
...
void put(K key, V value, Headers headers);
...
V putIfAbsent(K key, V value);
...
Proposed Changes
1. Enable Kafka Streams to filter, map and process Records using Headers. This is based on API change #3.
...