Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

First, we propose to add one new interface Named NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

Code Block
languagejava
package org.apache.kafka.streams.kstream;

/**
 * Default interface which can be used to personalized the named of bothoperations, {@linkinternal KStream}topics and {@link KTable} operationsor store.
 */
public interface NamedNamedOperation<T {

extends    default String name() {
        return null;NamedOperation<T>> {

    }

    /**
     * Creates an instance with the specifiedReturn the name.
 to be used for *operation.
     * @param name  the name to be used.
     * @return a new {@link Named} instance .string name
     */
    static Named with(final String name() {;

    /**
    return new* Named() {
      Sets the name to be used for operation.
      @Override*
     * @param name  the name  public String name() {to use.
     * @return an instance of       return name;{@link NamedOperation}
            }*/
    T withName(final String  }name);
    }
}



The Named NamedOperation interface will be extends by all existing action interfaces :

  • Aggregator
  • ForEachAction
  • KeyValueMapper
  • Predicate
  • Reducer
  • ValueJoiner
  • ValueMapper
  • ValueMapperWithKey

The name() method should be defined as a default method to not break compatibility with previous APIs. In addition, the default method allow us to still used lambda functions introduced with java 8.

Each interface will be enriched with a new static method named to keep a simple and elegant API for developers.

implemented/extended by following classes that already exist to configure operations : 

  • Produced 
  • Consumed 
  • Printed 
  • Joined 
  • Grouped 
  • Suppressed 

Then, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.

For that we will added a new default class Named implementing NamedOperation :Example for the interface  Predicate


Code Block
languagejava

public interfaceclass Predicate<K,Named V>implements extends NamedNamedOperation<Named> {

    /**
private static final int MAX_NAME_LENGTH * Test if the record with the given key and value satisfies the predicate.
     *= 249;

    protected String name;

    protected *Named(final @paramString key   the key of the record
  name) {
   * @param value the value ofthis.name the= recordname;
     * @return {@code true} if the {@link KeyValue} pair satisfies the predicate&mdash;{@code false} otherwise
(name != null)
           */
    boolean test(final K key, final V value); validate(name);
    }

    /**
     * CreatesCreate a newNamed operationinstance with the specifiedprovided name.
     *
     * @param name  the        the processor name to be used for generated processors applied to this stream.
 If {@code null} a *default @paramprocessor predicatename will be   the predicategenerated.
     * @param@return <K>     A new {@link Named} instance configured the type of keyswith name
     * @param/
 <V>   public static Named as(final String    the type of valuesname) {
     * @return a new {@link Predicate} instance.
     */
    static <K, V> Predicate<K, V> named(final String name, final Predicate<K, V> predicate) { Objects.requireNonNull(name, "name can't be null");
        return new Predicate<K, V>Named(name) {;
     }

       @Override
            public booleanString test(K key, V valuename() {
                return predicate.test(key, value)name;
            }

            @Override
    public Named       public withName(final String name() {
        Objects.requireNonNull(name, "name can't      return namebe null");
        return    }new Named(name);
        };
    }
}

Stateless operations

Stateless operations like foreach(), map(), flatMap(), mapValues(), filter(),filterNot,  are translated into a single processor. The action interfaces depicted above will be used to directly change the processor name (i.e ForEachAction, ValueMapper, ValueMapperWithKey KeyValueMapper).

The method branch() results in multiple streams that can be described with the used of the interface Predicate. However the name of the first translated processor will not be customizable because this would require to overload method with a one accepting a Named instance in argument.  This seems to be OK as the processor name would not determine any related topic / store names.

Statefull operations

Stateful operations like join(), leftjoin(), outerJoin() are translated to multiple processors. The given name should be used to describe the first processor and as a prefix for all subsequent processors. The interface ValueJoiner will be used to customize the processor names.

We propose to overload each of those methods to accept a Named agument :

  • KStream/KTable#transform()

  • KStream/KTable#transformValues()

  • KStream#process()

Source and Sink operations

Then, for those  methods we propose to add a new method withProcessorName() to classes Consumed, Produced and Printed in order to minimize the number of overloaded methods.

This will allow developers to specify a processor name for operations: 

  • <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)
  • <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> consumed)
  • void print(final Printed<K, V> printed)
  • void to(final String topic, final Produced<K, V> produced)
  • void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced)

Example for the Produced class : 

Code Block
languagejava
/**
 * Create an instance of {@link Produced} with provided processor name.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @param <K>         key type
 * @param <V>         value type
 * @return a new instance of {@link Produced}
 */
public static <K, V> Produced<K, V> with(final String processorName) {
    return new Produced<>(null, null, null, null, processorName);
}

/**
 * Configure the instance of {@link Produced} with a key {@link Serde}.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @return this
 */
public Produced<K, V> withProcessorName(final String processorName) {
    this.processorName = processorName;
    return this;
}

Proposed Changes


...
}



The below tables will resume all new and existing method :

KStream

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
YESstatic Named#as(String)N/AN/A
through(String, Produced)NOstatic Produced#as(String)N/AN/A
to(String, Produced)NOstatic Produced#as(String)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
transform(TransformerSupplier, Named named, String... )YESstatic Named#as(String)N/AN/A
transformValues(ValueTransformerSupplier, Named named, String...)YESstatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named named, String...)YESstatic Named#as(String)N/AN/A
process(ProcessorSupplier, Named named, String...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)N/A
join(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????


KTable


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
filter(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
filterNot(Predicate, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
filterNot(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
mapValues(ValueMapper, Named named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
mapValues(ValueMapperWithKey, Named named, materialized);YESstatic Named#as(String)N/Astatic Materialized#as(String)
suppress(Suppressed)NOSuppressed#withName(String)N/AN/A
transformValues(ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/Astatic Named#as(String)


KGroupedStream

KGroupedTable

TimeWindowedKStream

TimeWindowedKTable


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • Implement the new interface Named and updateall action interfaces
  • Overload methods process(), transform() and transformValues() for classes KStreams and KTables.
  • Update the Consumed, Produced and Printed classes to be able to set a processor name.
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us 249.

...

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.stream("topic-input", Consumed.withas("STREAM-FROM-TOPIC-INPUT")
        .filter(Predicate.named("FILTER-NULL-VALUE", (k, v) -> true ), Named.as("FILTER-NULL-VALUE")
        .map(KeyValueMapper.named("MAP-TO-UPPERCASE", (k, v) -> KeyValue.pair(k, v.toUpperCase()), Named.as("MAP-TO-UPPERCASE")
        .to("topic-output", Produced.withas("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> TO-OUTPUT-TOPIC
      <-- FILTER-NULL-VALUE
    Sink: TO-OUTPUT-TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE

...