...
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? | |||
---|---|---|---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | N/A | |||
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | N/A | |||
selectKey(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A | |||
map(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A | |||
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A | |||
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A | |||
flatMap(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A | |||
flatMapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A | |||
flatMapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A | |||
print(Printed) | NO | static Printed#as(String) | N/A | N/A | |||
foreach(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A | |||
peek(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A | |||
branch(Named, Predicate...) | YES | static Named#as(String) | N/A | N/A | |||
through(String, Produced) | NO | static Produced#as(String) | N/A | N/A | |||
to(String, Produced) | NO | static Produced#as(String) | N/A | N/A | |||
to(TopicNameExtractor, Produced) | NO | static Produced#as(String) | N/A | N/A | |||
transform(TransformerSupplier, Named, String... ) | YES | static Named#as(String) | N/A | N/A | |||
transformValues(ValueTransformerSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A | |||
transformValues( ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A | |||
process(ProcessorSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A | |||
join( KStream, ValueJoiner, JoinWindows windows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) | |||
leftJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) | |||
outerJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) | |||
join(KTable, ValueJoiner, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | N/A | |||
leftJoin(KTable, ValueJoiner, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | N/A | |||
join(GlobalKTbale, KeyValueMapper, ValueJoiner) | NO | (PREFIX + COUNT) | N/A | N/A | |||
join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named) | YES | static Joined#named(final String name) | N/A | N/A | |||
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) | NO | ??? | ??? | (PREFIX + COUNT) | N/A | N/A??? | |
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) | NO | ??? | ??? | YES | static Joined#named(final String name) | N/A | N/A??? |
KTable (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
filter(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
filterNot(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
mapValues(ValueMapper, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
mapValues(ValueMapperWithKey, Named, Materialized); | YES | static Named#as(String) | N/A | static Materialized#as(String) |
suppress(Suppressed) | NO | Suppressed#withName(String) | N/A | N/A |
transformValues(ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
groupBy(KeyValueMapper, KeyValue, Grouped) | NO | static Grouped#as(String) | static Grouped#as(String) | N/A |
join(KTable, ValueJoiner, Named) | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
join(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
leftJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
leftJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
outerJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(StringPREFIX + COUNT) |
outerJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
...
Generated | Named | Joined / Grouped / Produced / Consumed | Materialized | ||
---|---|---|---|---|---|
Node Name | X | X | X | ||
Repartition Topic | X | X | |||
Queryable Store | X | ||||
State store | XX | X | X | ||
Changelog Topic | XX | X | X |
Materialized
The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).Also, note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table".
Proposed Changes
- Implement the new interface NamedOperation and default class Named
- Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
- Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
- The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
- Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us 249.
...