...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.kstream; /** * Default interface which can be used to personalized the named of operations, internal topics or store. */ public interface NamedOperation<T extends NamedOperation<T>> { /** * ReturnSets the name to be used for operation. * * @return a string@param name */ String name(); /**the name to use. * Sets@return thean nameinstance toof be used for operation.{@link NamedOperation} */ T *withName(final @param name the name to use. * @return an instance of {@link NamedOperation} */ T withName(final String name)String name); } |
The NamedOperation interface will be implemented/extended by following classes that already exist to configure operations :
- Produced
- Consumed
- Printed
- Joined
- Grouped
- Suppressed
- Produced
- Consumed
- Printed
- Joined
- Grouped
- Suppressed
In addition, we propose to add a new static method with the following signature to each of those class as(final String processorName).
Deprecration
In order to fix some inconsistencies in API, we also propose to deprecate the method named(String) from the class Joined in favor of new method as().
In addition, as no configuration classes expose the processor name, we will also deprecate the method name() from the class Joined . This method should be removed in a future release.
Overloaded methods
Then, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.
...
Code Block | ||
---|---|---|
| ||
public class Named implements NamedOperation<Named> { private static final int MAX_NAME_LENGTH = 249; protected String name; protected Named(final String name) { this.name = name; if (name != null) validate(name); } /** * Create a Named instance with provided name. * * @param name the processor name to be used. If {@code null} a default processor name will be generated. * @return A new {@link Named} instance configured with name */ public static Named as(final String name) { Objects.requireNonNull(name, "name can't be null"); return new Named(name); } @Override public String name() { return name; } @Override public Named withName(final String name) { Objects.requireNonNull(name, "name can't be null"); return new Named(name); } ... } |
...
The below tables will resume all new and existing methods :
KStream (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
selectKey(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
map(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
flatMap(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
print(Printed) | NO | static Printed#as(String) | N/A | N/A |
foreach(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
peek(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
branch(Named, Predicate...) | YES | static Named#as(String) | N/A | N/A |
through(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(TopicNameExtractor, Produced) | NO | static Produced#as(String) | N/A | N/A |
transform(TransformerSupplier, Named, String... ) | YES | static Named#as(String) | N/A | N/A |
transformValues(ValueTransformerSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
transformValues( ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
process(ProcessorSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
join( KStream, ValueJoiner, JoinWindows windows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) |
leftJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) |
outerJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | static Joined#named(final String name) |
join(KTable, ValueJoiner, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | N/A |
leftJoin(KTable, ValueJoiner, Joined) | NO | static Joined#named(final String name) | static Joined#named(final String name) | N/A |
join(GlobalKTable, KeyValueMapper, ValueJoiner) | NO | ??? | ??? | ??? |
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) | NO | ??? | ??? | ??? |
KTable (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
filter(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
filterNot(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
mapValues(ValueMapper, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
mapValues(ValueMapperWithKey, Named, Materialized); | YES | static Named#as(String) | N/A | static Materialized#as(String) |
suppress(Suppressed) | NO | Suppressed#withName(String) | N/A | N/A |
transformValues(ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | static Named#as(String) |
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
groupBy(KeyValueMapper, KeyValue, Grouped) | NO | static Grouped#as(String) | static Grouped#as(String) | N/A |
join(KTable, ValueJoiner, Named) | YES | static Named#as(String) | N/A | static Named#as(String) |
join(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
leftJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(String) |
leftJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
outerJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | static Named#as(String) |
outerJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
KGroupedStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
KGroupedTable (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
TimeWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
SessionWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Merger, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Merger, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
At the end, we can summarize the scope of each configuration class as follow :
Generated | Named | Joined / Grouped / Produced / Consumed | Materialized | |
---|---|---|---|---|
Node Name | X | X | X | |
Repartition Topic | X | X | ||
Queryable Store | X | |||
State store | X | X | X | X |
Changelog Topic | X | X | X | X |
Materialized
The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).
...