Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, as no configuration classes expose the processor name, we will also deprecate the method name() from the class Joined . This method should be removed in a future release.

Overloaded methods


Then, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.

For that we will added a new default class Named implementing NamedOperation :


Code Block
languagejava
public class Named implements NamedOperation<Named> {

    private static final int MAX_NAME_LENGTH = 249;

    protected String name;

    protected Named(final String name) {
        this.name = name;
        if (name != null)
            validate(name);
    }

    /**
     * Create a Named instance with provided name.
     *
     * @param name  the processor name to be used. If {@code null} a default processor name will be generated.
     * @return      A new {@link Named} instance configured with name
     */
    public static Named as(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }

    @Override
    public Named withName(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }
...
}

...

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)

At the end, we can summarize the scope of each configuration class as follow : 

...

Materialized

The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).

Name Validation

User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.

Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.

We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.

In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException .

Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length of 249.

Below is an application example : 

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.stream("topic-input", Consumed.as("STREAM-FROM-TOPIC-INPUT")
        .filter((k, v) -> true ), Named.as("FILTER-NULL-VALUE")
        .map((k, v) -> KeyValue.pair(k, v.toUpperCase()), Named.as("MAP-TO-UPPERCASE")
        .to("topic-output", Produced.as("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> TO-OUTPUT-TOPIC
      <-- FILTER-NULL-VALUE
    Sink: TO-OUTPUT-TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE

A straightforward first pass is GitHub PR 6958

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

...


Name Validation

User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.

Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.


We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.

In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException .


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length of 249

...

  • .