This is a tentative summary of what the DSL would be end-up looking like after these three KIPs.
Things to think about:
1. For the RichValueXX / RichInitializer / RichReducer functions under discussion in KIP-159, whether it should also include the key in addition to the context itself? Here are my thoughts about pros and cons:
a) If we do not do that, we would have overloaded function as ValueFunc / ValueFuncWithKey / RichValueFunc / RichValueFuncWithKey, to accept value / key+value / context+value / context+key+value. This is simply too many overloads.
b) If we have only two overloads, with value (or key+value, for KeyValueMapper, etc) and key+value+context, then for people who only wants to access key+value or context+value, they need to use the overload function with dummy parameters (i.e. "key, value, _" in Scala).
The following first draft assumes a). But under that option, the next question to ask is would we be subsuming all of KIP-149 in KIP-159 then?
2. In the current KIP-159 design doc, for "aggregate" functions we enrich the `Aggregator` with `RichAggregator` but not `Initializer` with `RichInitializer`. I think it is better to also have RichInitializer for aggregate functions where we are using RichAggregators. So the overloads would be, for example "initializer+aggregator" and "richInitializer+richAggregator".
KStream Non-stateful
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper); // old <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper); // KIP-149 <VR> KStream<K, VR> mapValues(final RichValueMapper<? super K, ? super V, ? extends VR> mapper); // KIP-159 <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier< ? super V, ? extends VR> valueTransformerWithKeySupplier, final String... stateStoreNames); // old <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, final String... stateStoreNames); // KIP-149 <VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, final String... stateStoreNames); // KIP-159 <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor); // old <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> processor); // KIP-149 <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super K, ? super V, ? extends Iterable<? extends VR>> processor); // KIP-159 KStream<K, V> filter(Predicate<? super K, ? super V> predicate); // old KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate); // KIP-159 KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate); // old KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); // KIP-159 <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper); // old <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); // KIP-159 <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); // old <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); // KIP-159 <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); // old <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); // KIP-159 void print(final Printed<K, V> printed); // KIP-182 void foreach(final ForeachAction<? super K, ? super V> action); // old void foreach(final RichForeachAction<? super K, ? super V> action); // KIP-159 KStream<K, V> peek(final ForeachAction<? super K, ? super V> action); // old KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); // KIP-159 KStream<K, V>[] branch(final Predicate<? super K, ? super V>... predicates); // old KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); // KIP-159 KStream<K, V> through(final String topic); // old KStream<K, V> through(final String topic, final Produced<K, V> partitioned); // KIP-182 void to(final String topic); // old void to(final String topic, final Produced<V, V> partitioned); // KIP-182
KTable Non-stateful
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate); // old KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); // KIP-159 KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/159 KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate); // old KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); // KIP-159 KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/159 <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper); // old <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper); // KIP-159 <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/159 <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper); // old <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); // KIP-159 void to(final String topic); // old void to(final String topic, final Produced<V, V> options); // KIP-182 KTable<K, V> through(final String topic); // old KTable<K, V> through(final String topic, final Materialized<K, V> options); // KIP-182
KStream Joins
<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options); // KIP-182 <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> options); // KIP-182 <VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoinerWithKey<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options); // KIP-182/149, can this be subsumed by KIP-159? <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoinerWithKey<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> options); // KIP-182/149, can this be subsumed by KIP-159? <VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options); // KIP-182/159 <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> options); // KIP-182/159 // same for KStream#leftJoin, KStream#outerJoin
KTable Joins
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182 <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoinerWithKey<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<K, VR>> materiaized); // KIP-182/149, can this be subsumed by KIP-159? <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182/159 // same for KTable#leftJoin, KTable#outerJoin
KStream Aggregations
KGroupedStream<K, V> groupByKey(); // old KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized); // KIP-182 <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? superK, ? superV, KR> selector); // old <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? superK, ? superV, KR> selector, final Serialized<KR, V> serialized); // KIP-182 <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? superK, ? superV, KR> selector); // old <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? superK, ? superV, KR> selector, final Serialized<KR, V> serialized); // KIP-182
KGroupedStream / WindowedKStream Aggregations
For WindowedKStream the only difference is typed K to Windowed<K>, and hence is omitted here:
KTable<K, Long> count(); // old KTable<K, Long> count(final Materialized<K, Long> materialized); // KIP-182 KTable<K, V> reduce(final Reducer<V> reducer); // old KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 KTable<K, V> reduce(final ReducerWithKey<V> reducer); // KIP-149, can this be subsumed by KIP-159? KTable<K, V> reduce(final ReducerWithKey<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/149, can this be subsumed by KIP-182/159? KTable<K, V> reduce(final RichReducer<V> reducer); // KIP-159 KTable<K, V> reduce(final RichReducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/159 <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); // old <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182 <VR> KTable<K, VR> aggregate(final InitializerWithKey<K, VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); // KIP-149, can this be subsumed? <VR> KTable<K, VR> aggregate(final InitializerWithKey<K, VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182/149, can this be subsumed? <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); // KIP-159 <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182/159
KTable Aggregations
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); // old <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Serialized<KR, VR> serialized); // KIP-182 <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); // KIP-159 <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Serialized<KR, VR> serialized); // KIP-182/159
KGroupedTable Aggregations
KTable<K, Long> count(); // old KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor); // old KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182 KTable<K, V> reduce(final RichReducer<V> adder, final RichReducer<V> subtractor); // KIP-159 KTable<K, V> reduce(final RichReducer<V> adder, final RichReducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized); // KIP-182/KIP-149 <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> subtractor); // old <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182 <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final RichAggregator<? super K, ? super V, VR> subtractor); // KIP-159 <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final RichAggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); // KIP-182/159