Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

So allowing users to be able to set more meaningful names can help to resolve complexity of some developed topologies. For example, a processor name could describe the business rule performed by a map() operation.

In addition, the generated names have a few disadvantages to guarantee topology compatibilities. In fact, adding a new operator, using a third-library doing some optimization to remove some operators or upgrading to a new KafkaStreams version with internal API changes may changed suffix indexing for a large amount of the processor names. This will in turn change the internal state store names, as well as internal topic names as well.

Public Interfaces

First, we propose to add one new interface NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

Public Interfaces

First, we propose to add one new interface NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

Code Block
languagejava
package org.apache
Code Block
languagejava
package org.apache.kafka.streams.kstream;

/**
 * Default interface which can be used to personalized the named of operations, internal topics or store.
 */
public interface NamedOperation<T extends NamedOperation<T>> {

    /**
     * Return the name to be used for operation.
     *
     * @return a string name
     */
    String name();

    /**
     * Sets the name to be used for operation.
     *
     * @param name  the name to use.
     * @return an instance of {@link NamedOperation}
     */
    T withName(final String name);

}


...

Code Block
languagejava
public class Named implements NamedOperation<Named> {

    private static final int MAX_NAME_LENGTH = 249;

    protected String name;

    protected Named(final String name) {
        this.name = name;
        if (name != null)
            validate(name);
    }

    /**
     * Create a Named instance with provided name.
     *
     * @param name  the processor name to be used. If {@code null} a default processor name will be generated.
     * @return      A new {@link Named} instance configured with name
     */
    public static Named as(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public Named withName(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }
...
}

The below tables will resume all new and existing methods :

KStream (16 new methods)


}


The below tables will resume all new and existing methods :

KStream (16 new methods)

filter(PredicatefilterNotPredicate, NamedYES Named#asselectKeyKeyValueMappermapKeyValueMappermapValuesValueMapper, Named
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
mapValuesthrough(ValueMapperWithKeyString, NamedProduced)YESNOstatic Named#asProduced#as(String)N/AN/A
flatMapto(KeyValueMapperString, NamedProduced)YESNOstatic Named#asProduced#as(String)N/AN/A
flatMapValuesto(ValueMapperTopicNameExtractor, NamedProduced)YESNOstatic Named#asProduced#as(String)N/AN/A
flatMapValuestransform(ValueMapperWithKeyTransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/A
print(PrintedtransformValues(ValueTransformerSupplier, Named, String...)NOYESstatic Printed#asNamed#as(String)N/AN/A
foreachtransformValues( ForeachActionValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
peekprocess(ForeachActionProcessorSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joinedthrough(String, Produced)NOstatic Produced#asJoined#named(final String name)N/AN/Astatic Joined#named(final String name)static Joined#named(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joinedto(String, Produced)NOstatic Produced#asJoined#named(final String name)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
static Joined#named(final String name)static Joined#named(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String nametransform(TransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/AtransformValues(ValueTransformerSupplier, Named, String...)YES
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String namestatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
join(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????


KTable (16 new methods)


process(ProcessorSupplier, Named, String...N/Astatic Joined#named(final String name
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
static Named#as(String)
filter(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
filterNot(Predicate, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
filterNot(Predicate, Named, Materialized)YESstatic Named#as(Stringjoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)N/AleftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)Materialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/A
join(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????

KTable (16 new methods)

filter(Predicate, Named) Named#asfilterNotPredicate Named#asmapValues(ValueMapperjoin(KTable, ValueJoiner,
static Named#as(String)
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
mapValues(ValueMapper, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
mapValues(ValueMapperWithKey, Named, Materialized);
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
YESstatic Named#as(String)N/Astatic Materialized#as(String)
suppress(Suppressed)NOSuppressed#withName(String)N/AN/A
transformValues(ValueTransformerWithKeySupplierfilter(Predicate, Named, MaterializedString...)YESstatic Named#as(String)N/AMaterialized#asstatic Named#as(String)
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...)YESstatic Named#as(String)N/Astatic Materialized#as(String)
filterNotgroupBy(PredicateKeyValueMapper, NamedKeyValue, MaterializedGrouped)NOstatic Grouped#as(String)YESstatic Named#asGrouped#as(String)N/AMaterialized#as(String)
join(KTable, ValueJoiner, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
mapValuesjoin(ValueMapperWithKeyKTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Named#asMaterialized#as(String)
mapValuesleftJoin(ValueMapperKTable, ValueJoiner, Named, Materialized);YESstatic Named#as(String)N/Astatic Materialized#asNamed#as(String)
mapValuesleftJoin(ValueMapperWithKeyKTable, ValueJoiner, Named, Materialized);YESstatic Named#as(String)N/Astatic Materialized#as(String)
suppress(Suppressed)NOSuppressed#withName(String)N/AN/A
transformValues(ValueTransformerWithKeySupplier, Named, String...)outerJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/Astatic Named#as(String)
transformValuesouterJoin(ValueTransformerWithKeySupplierKTable, MaterializedValueJoiner, Named, String...Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#asMaterialized#as(String)static Grouped#as(String)N/A


KGroupedStream (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
join
count(
KTable, ValueJoiner,
Named, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
leftJoin
reduce(
KTable, ValueJoiner
Reducer, Named)
;
YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
leftJoin
reduce(
KTable, ValueJoiner
Reducer, Named, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
outerJoin
aggregate(
KTable
Initializer,
ValueJoiner
Aggregator, Named)
;
YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
outerJoin
aggregate(
KTable
Initializer,
ValueJoiner
Aggregator, Named, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)


KGroupedStream KGroupedTable (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Aggregator, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


KGroupedTable TimeWindowedKStream (6 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce
aggregate(
Reducer
Initializer,
Reducer
Aggregator, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce
aggregate(
Reducer
Initializer,
Reducer
Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Aggregator
reduce(Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Aggregator
reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


TimeWindowedKStream SessionWindowedKStream (6 new methods)

(PREFIX + COUNT)aggregateInitializer, Aggregator,
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?count(Named)YESstatic Named#as(String)N/A
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregatecount(Initializer, Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reduceraggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reduceraggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)

SessionWindowedKStream (6 new methods)

count(
)
reduce(Reducer,
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
countreduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


At the end, we can summarize the scope of each configuration class as follow : 



GeneratedNamedJoined / Grouped / Produced / ConsumedMaterialized
Node NameXXX
Repartition TopicX
X
Queryable Store


X
State storeXXXX
Changelog TopicXXXX


Materialized

The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).

Also, note that Note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table".

...

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.stream("topic-input", Consumed.as("STREAM-FROM-TOPIC-INPUT")
        .filter((k, v) -> true ), Named.as("FILTER-NULL-VALUE")
        .map((k, v) -> KeyValue.pair(k, v.toUpperCase()), Named.as("MAP-TO-UPPERCASE")
        .to("topic-output", Produced.as("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> TO-OUTPUT-TOPIC
      <-- FILTER-NULL-VALUE
    Sink: TO-OUTPUT-TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE


A straightforward first pass is GitHub PR 6958

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

  1. The first proposition was to overload all stateless methods to accept an instance of Described class. However this solution was resulting in modiying a large percentage of the existing KStream and KTable methods.
    The second proposition was to add new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. 
    However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Void.
  2. The second proposition was to enrich all actions classes (Reducer, Predicate, etc) with a new default method "as(String)" in order to name the operation. But this leads to mix different classes with different semantics (Predicate vs Consumed/Produced) creating a couple of unfortunate side-effects:.