Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: here
...
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
selectKey(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
map(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
flatMap(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
print(Printed) | NO | static Printed#as(String) | N/A | N/A |
foreach(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
peek(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
branch(Named, Predicate...) | YES | static Named#as(String) | N/A | N/A |
through(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(TopicNameExtractor, Produced) | NO | static Produced#as(String) | N/A | N/A |
transform(TransformerSupplier, Named, String... ) | YES | static Named#as(String) | N/A | N/A |
transformValues(ValueTransformerSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
transformValues( ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
process(ProcessorSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
join( KStream, ValueJoiner, JoinWindows windows, Joined) | NO | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) |
leftJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) |
outerJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) |
join(KTable, ValueJoiner, Joined) | NO | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) | N/A |
leftJoin(KTable, ValueJoiner, Joined) | NO | static Joined#namedJoined#as(final String name) | static Joined#namedJoined#as(final String name) | N/A |
join(GlobalKTableGlobalKTbale, KeyValueMapper, ValueJoiner, Named) | NO | ??? | ??? | YES | static Named#as(String)
| N/A | N/A??? |
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner, Named) | NO | ??? | ??? | ??? |
KTable (16 new methods)
YES | static Named#as(String)
| N/A | N/A |
flatTransform(TransformerSupplier, Named named, String... stateStoreNames |
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | static Named#as(String) | N/A |
flatTransformValues(ValueTransformerWithKeySupplierfilter(Predicate, Named, Materialized String... ) | YES | static Named#as(String) | N/A | Materialized#as(String) | filterNot(Predicate, Named) | YES | static Named#as(String) | N/A |
static Named#as(String) | filterNot(PredicateflatTransformValues(ValueTransformerSupplier, Named, MaterializedString...) | YES | static Named#as(String) | N/A | Materialized#as(String) | N/A |
KTable (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
count(Named) | YES | method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
filter(Predicate |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | static Named#as( | StringPREFIX + COUNT)
|
mapValuesfilter(ValueMapperPredicate, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String)mapValues |
filterNot( | ValueMapperWithKeyPredicate, Named | , Materialized) | ; | YES | static Named#as(String) | N/A | static Materialized#as( | StringPREFIX + COUNT)
|
suppress(SuppressedfilterNot(Predicate, Named, Materialized) | NOYES | Suppressed#withNamestatic Named#as(String) | N/A | N/A | transformValues(ValueTransformerWithKeySupplier, Named, String...Materialized#as(String) |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | static Named#as( | StringPREFIX + COUNT)
|
transformValuesmapValues(ValueTransformerWithKeySupplier, MaterializedValueMapperWithKey, Named, String...) | YES | static Named#as(String) | N/A | static Materialized#as(StringPREFIX + COUNT)
|
groupBymapValues(KeyValueMapperValueMapper, KeyValueNamed, GroupedMaterialized) | NOYES | static Grouped#asNamed#as(String) | N/A | static Grouped#asMaterialized#as(String) | N/A |
join(KTable, ValueJoiner, Named)mapValues(ValueMapperWithKey, Named, Materialized); | YES | static Named#as(String) | N/A | static Named#asMaterialized#as(String) |
join(KTable, ValueJoiner, Named, Materializedsuppress(Suppressed) | YESNO | static Named#asSuppressed#withName(String) | N/A | static Materialized#as(String) | N/A |
transformValues(ValueTransformerWithKeySupplier, Named, String...)leftJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT)
|
leftJointransformValues(KTableValueTransformerWithKeySupplier, ValueJoinerMaterialized, Named, MaterializedString...) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
groupBy(KeyValueMapper, KeyValue, Grouped) | NO | static Grouped#as(String) | static Grouped#as(String) | N/A |
joinouterJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
outerJoinjoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
KGroupedStream (6 new methods)
leftJoin(KTable, ValueJoiner, Named); | YES |
static Named#as(String) | N/A | (PREFIX + COUNT)
|
countleftJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
reduceReducerKTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | (PREFIX + COUNT)
|
reduceReducerKTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
aggregateInitializer, Aggregator, Named) | YES | static Named#as(String) | N/A |
(PREFIX + COUNT) | aggregate(Initializer, Aggregator, Named, MaterializedN/A |
toStream(KeyValueMapper, Named) | YES | static Named#as(String) | N/A |
Materialized#as(String) |
KGroupedStream KGroupedTable (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
TimeWindowedKStream KGroupedTable (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregateInitializer AggregatorReducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregateInitializer AggregatorReducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reduceraggregate(Initializer, Aggregator, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reduceraggregate(Initializer, Aggregator, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
SessionWindowedKStream TimeWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT)
|
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Merger, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Merger, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
At the end, we can summarize the scope of each configuration class as follow :
...
SessionWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Merger, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Merger, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
At the end, we can summarize the scope of each configuration class as follow :
| Generated | Named | Joined / Grouped / Produced / Consumed | Materialized |
---|
Node Name | X | X | X |
|
Repartition Topic | X |
| X |
|
Queryable Store |
|
|
| X |
State store | X |
| X | X |
Changelog Topic | X |
| X | X |
Materialized
The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).
Name Validation
User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.
Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.
We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.
In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException
Materialized
The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).
Also, note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table" .
Proposed Changes
- Implement the new interface NamedOperation and default class Named
- Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
- Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
- The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
- Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us of 249.
Below is an application example :
...