Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
So allowing users to be able to set more meaningful names can help to resolve complexity of some developed topologies. For example, a processor name could describe the business rule performed by a map() operation.
Public Interfaces
In addition, the generated names have a few disadvantages to guarantee topology compatibilities. In fact, adding a new operator, using a third-library doing some optimization to remove some operators or upgrading to a new KafkaStreams version with internal API changes may changed suffix indexing for a large amount of the processor names. This will in turn change the internal state store names, as well as internal topic names as well.
Public Interfaces
First, we propose to add one new interface Named that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.
First, we propose to add one new interface NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams.kstream;
/**
* Default interface which can be used to personalized the named of operations, internal topics or store | ||
Code Block | ||
| ||
package org.apache.kafka.streams.kstream; /** * Default interface which can be used to personalized the named of both {@link KStream} and {@link KTable} operations. */ public interface NamedNamedOperation<T extends NamedOperation<T>> { default/** String name() { * Sets the name to be returnused null; } /** * Creates an instance with the specified namefor operation. * * @param name the name to be useduse. * @return aan instance newof {@link NamedNamedOperation} instance . */ staticT Named withwithName(final String name) { return new Named() { @Override public String name() { return name; } }; } } |
The Named interface will be extends by all existing action interfaces :
- Aggregator
- ForEachAction
- KeyValueMapper
- Predicate
- Reducer
- ValueJoiner
- ValueMapper
- ValueMapperWithKey
The name() method should be defined as a default method to not break compatibility with previous APIs. In addition, the default method allow us to still used lambda functions introduced with java 8.
Each interface will be enriched with a new static method named to keep a simple and elegant API for developers.
;
}
|
The NamedOperation interface will be implemented/extended by following classes that already exist to configure operations :
- Produced
- Consumed
- Printed
- Joined
- Grouped
- Suppressed
In addition, we propose to add a new static method with the following signature to each of those class as(final String processorName).
Deprecration
In order to fix some inconsistencies in API, we also propose to deprecate the method named(String) from the class Joined in favor of new method as().
In addition, as no configuration classes expose the processor name, we will also deprecate the method name() from the class Joined . This method should be removed in a future release.
Overloaded methods
Then, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.
For that we will added a new default class Named implementing NamedOperation :Example for the interface Predicate:
Code Block | ||
---|---|---|
| ||
public interfaceclass Predicate<K,Named V>implements extendsNamedOperation<Named> Named { /** private static final int MAX_NAME_LENGTH * Test if the record with the given key and value satisfies the predicate. *= 249; protected String name; protected Named(final String name) { * @param key this.name = thename; key of the record if *(name @param value the value of the record != null) * @return {@code true} if the {@link KeyValue} pair satisfies the predicate—{@code false} otherwise */ validate(name); } /** * Create a Named instance with provided name. boolean test(final* K key, final V value); * @param name the processor /** name to be used. If *{@code Createsnull} a newdefault operationprocessor withname thewill specifiedbe namegenerated. * @return * @param nameA new {@link Named} instance configured with name the name to*/ be used forpublic generatedstatic processorsNamed appliedas(final toString thisname) stream.{ * @param predicate Objects.requireNonNull(name, "name can't the predicate.be null"); * @param <K> return new Named(name); } the type of keys@Override public *Named @paramwithName(final <V>String name) { the type of values Objects.requireNonNull(name, "name can't be null"); * @return areturn new {@link Predicate} instance. */ static <K, V> Predicate<K, V> named(final String name, final Predicate<K, V> predicate) { return new Predicate<K, V>() { @Override public boolean test(K key, V value) { return predicate.test(key, value); } @Override public String name() { return name; } }; } } |
Stateless operations
Stateless operations like foreach(), map(), flatMap(), mapValues(), filter(),filterNot, are translated into a single processor. The action interfaces depicted above will be used to directly change the processor name (i.e ForEachAction, ValueMapper, ValueMapperWithKey KeyValueMapper).
The method branch() results in multiple streams that can be described with the used of the interface Predicate. However the name of the first translated processor will not be customizable because this would require to overload method with a one accepting a Named instance in argument. This seems to be OK as the processor name would not determine any related topic / store names.
Statefull operations
Stateful operations like join(), leftjoin(), outerJoin() are translated to multiple processors. The given name should be used to describe the first processor and as a prefix for all subsequent processors. The interface ValueJoiner will be used to customize the processor names.
We propose to overload each of those methods to accept a Named agument :
KStream/KTable#transform()
KStream/KTable#transformValues()
KStream#process()
Source and Sink operations
Then, for those methods we propose to add a new method withProcessorName() to classes Consumed, Produced and Printed in order to minimize the number of overloaded methods.
This will allow developers to specify a processor name for operations:
- <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)
- <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> consumed)
- void print(final Printed<K, V> printed)
- void to(final String topic, final Produced<K, V> produced)
- void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced)
Example for the Produced class :
Code Block | ||
---|---|---|
| ||
/**
* Create an instance of {@link Produced} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Produced}
*/
public static <K, V> Produced<K, V> with(final String processorName) {
return new Produced<>(null, null, null, null, processorName);
}
/**
* Configure the instance of {@link Produced} with a key {@link Serde}.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @return this
*/
public Produced<K, V> withProcessorName(final String processorName) {
this.processorName = processorName;
return this;
} |
Proposed Changes
Named(name);
}
...
} |
The below tables will resume all new and existing methods :
KStream (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | N/A |
selectKey(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
map(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
flatMap(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
flatMapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | N/A |
print(Printed) | NO | static Printed#as(String) | N/A | N/A |
foreach(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
peek(ForeachAction, Named) | YES | static Named#as(String) | N/A | N/A |
branch(Named, Predicate...) | YES | static Named#as(String) | N/A | N/A |
through(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(String, Produced) | NO | static Produced#as(String) | N/A | N/A |
to(TopicNameExtractor, Produced) | NO | static Produced#as(String) | N/A | N/A |
transform(TransformerSupplier, Named, String... ) | YES | static Named#as(String) | N/A | N/A |
transformValues(ValueTransformerSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
transformValues( ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
process(ProcessorSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
join( KStream, ValueJoiner, JoinWindows windows, Joined) | NO | static Joined#as(final String name) | static Joined#as(final String name) | static Joined#as(final String name) |
leftJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#as(final String name) | static Joined#as(final String name) | static Joined#as(final String name) |
outerJoin(KStream, ValueJoiner, JoinWindows, Joined) | NO | static Joined#as(final String name) | static Joined#as(final String name) | static Joined#as(final String name) |
join(KTable, ValueJoiner, Joined) | NO | static Joined#as(final String name) | static Joined#as(final String name) | N/A |
leftJoin(KTable, ValueJoiner, Joined) | NO | static Joined#as(final String name) | static Joined#as(final String name) | N/A |
join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named) | YES | static Named#as(String) | N/A | N/A |
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner, Named) | YES | static Named#as(String) | N/A | N/A |
flatTransform(TransformerSupplier, Named named, String... stateStoreNames) | YES | static Named#as(String) | N/A | N/A |
flatTransformValues(ValueTransformerWithKeySupplier, Named, String... ) | YES | static Named#as(String) | N/A | N/A |
flatTransformValues(ValueTransformerSupplier, Named, String...) | YES | static Named#as(String) | N/A | N/A |
KTable (16 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
filter(Predicate, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
filter(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
filterNot(Predicate, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
filterNot(Predicate, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
mapValues(ValueMapper, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
mapValues(ValueMapperWithKey, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
mapValues(ValueMapper, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
mapValues(ValueMapperWithKey, Named, Materialized); | YES | static Named#as(String) | N/A | static Materialized#as(String) |
suppress(Suppressed) | NO | Suppressed#withName(String) | N/A | N/A |
transformValues(ValueTransformerWithKeySupplier, Named, String...) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
groupBy(KeyValueMapper, KeyValue, Grouped) | NO | static Grouped#as(String) | static Grouped#as(String) | N/A |
join(KTable, ValueJoiner, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
join(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
leftJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
leftJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
outerJoin(KTable, ValueJoiner, Named); | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
outerJoin(KTable, ValueJoiner, Named, Materialized) | YES | static Named#as(String) | N/A | static Materialized#as(String) |
toStream(Named) | YES | static Named#as(String) | N/A | N/A |
toStream(KeyValueMapper, Named) | YES | static Named#as(String) | N/A | N/A |
KGroupedStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
KGroupedTable (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
TimeWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
SessionWindowedKStream (6 new methods)
method | Added for this KIP ? | Object/method used for node name | Used for repartition topic name | Used for state store name ? |
---|---|---|---|---|
count(Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
count(Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
aggregate(Initializer, Aggregator, Merger, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
aggregate(Initializer, Aggregator, Merger, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
reduce(Reducer, Named) | YES | static Named#as(String) | N/A | (PREFIX + COUNT) |
reduce(Reducer, Named, Materialized) | YES | static Named#as(String) | N/A | Materialized#as(String) |
At the end, we can summarize the scope of each configuration class as follow :
Generated | Named | Joined / Grouped / Produced / Consumed | Materialized | |
---|---|---|---|---|
Node Name | X | X | X | |
Repartition Topic | X | X | ||
Queryable Store | X | |||
State store | X | X | X | |
Changelog Topic | X | X | X |
Materialized
The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).
Name Validation
User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.
Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.
We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.
In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException .
Proposed Changes
- Implement the new interface NamedOperation and default class Named
- Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
- Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
- Implement the new interface Named and updateall action interfaces
- Overload methods process(), transform() and transformValues() for classes KStreams and KTables.
- Update the Consumed, Produced and Printed classes to be able to set a processor name.
- The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
- Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us of 249.
Below is an application example :
Code Block | ||
---|---|---|
| ||
final StreamsBuilder builder = new StreamsBuilder(); builder.stream("topic-input", Consumed.withas("STREAM-FROM-TOPIC-INPUT") .filter(Predicate.named("FILTER-NULL-VALUE", (k, v) -> true ), Named.as("FILTER-NULL-VALUE") .map(KeyValueMapper.named("MAP-TO-UPPERCASE", (k, v) -> KeyValue.pair(k, v.toUpperCase()), Named.as()))"MAP-TO-UPPERCASE") .to("topic-output", Produced.withas("TO-OUTPUT-TOPIC")); System.out.println(builder.build().describe()); ---- (output)---- Topologies: Sub-topology: 0 Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input]) --> FILTER-NULL-VALUE Processor: FILTER-NULL-VALUE (stores: []) --> MAP-TO-UPPERCASE <-- STREAM-FROM-TOPIC-INPUT Processor: MAP-TO-UPPERCASE (stores: []) --> TO-OUTPUT-TOPIC <-- FILTER-NULL-VALUE Sink: TO-OUTPUT-TOPIC (topic: topic-output) <-- MAP-TO-UPPERCASE |
A straightforward first pass is GitHub PR 6958
Compatibility, Deprecation, and Migration Plan
...
- The first proposition was to overload all stateless methods to accept an instance of Described class. However this solution was resulting in modiying a large percentage of the existing KStream and KTable methodsadd new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Void.
- The second proposition was to add new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Voidenrich all actions classes (Reducer, Predicate, etc) with a new default method "as(String)" in order to name the operation. But this leads to mix different classes with different semantics (Predicate vs Consumed/Produced) creating a couple of unfortunate side-effects:.