Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

First, we propose to add one new class Described interface Named that can be used to provide the optional parameters when processing each record (i.e to set the processor name)customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

Code Block
languagejava
package org.apache.kafka.streams.kstream;

import java.util.Objects;

/**
 * Default interface Thiswhich classcan isbe used to providepersonalized the optional parameters when processing each record named of both {@link KStream} and {@link KTable} operations.
 */
public classinterface DescribedNamed {

    protected finaldefault String processorName;

    protected Described(final String processorNamename() {
        this.processorName = processorName;
    }

    protected Described(final Described described) {
        this(described.getProcessorName())return null;
    }

    /**
     * CreateCreates aan Described instance with providedthe processorspecified name.
     *
 @param processorName   * @param name name tothe usename to describe the {@link org.apache.kafka.streams.processor.Processor}be used.
     * @return  Aa new {@link DescribedNamed} instance configured withName processorName.
     */
    public static DescribedNamed withNamewith(final String processorNamename) {
        return new DescribedNamed(processorName);
 {
        }

    @Override
            public booleanString equalsname(Object o) {
            if (this == o) return truename;
        if (o == null || getClass() != o.getClass()) return false;
        Described described = (Described) o;
        return Objects.equals(processorName, described.processorName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(processorName);
    }

    public String getProcessorName() {
        return processorName;
    }
}

This class will be then used by a new method as(Processed)  added to KStream and KTable classes : 

...

languagejava

...

    }
        };
    }
}


The Named interface will be extends by all existing action interfaces :

  • Aggregator
  • ForEachAction
  • KeyValueMapper
  • Predicate
  • Reducer
  • ValueJoiner
  • ValueMapper
  • ValueMapperWithKey

The name() method should be defined as a default method to not break compatibility with previous APIs. In addition, the default method allow us to still used lambda functions introduced with java 8.

Each interface will be enriched with a new static method named to keep a simple and elegant API for developers.

Example for the interface  Predicate


Code Block
languagejava
public interface Predicate<K, V> extends Named {

    /**
     * Test if the record with the given key and value satisfies the predicate.
     *
     * @param key   the key of the record
     * @param value the value of the record
     * @return {@code true} if the {@link KeyValue} pair satisfies the predicate&mdash;{@code false} otherwise
     */
    boolean test(final K key, final V value);

    /**
     * Creates a new operation with the specified name.
     *
     * @param name          the name to be used for generated processors applied to this stream.
     * @param predicate     the predicate.
     * @param <K>           the type of keys
     * @param <V>           the type of values
     * @return a new {@link Predicate} instance.
     */
    static <K, V> Predicate<K, V> named(final String name, final Predicate<K, V> predicate) {
        return new Predicate<K, V>() {
            @Override
            public boolean test(K key, V value) {
                return 

...

predicate.test(key, value

...

);
  

...

          }

...


            @Override
            public String name() {
                return name;
            }
        };
    }
}


Stateless operations

Stateless operations like streamforeach(), map(), flatMap(), mapValues(), filter(),filterNot,  are translated into a single processor. The method action interfaces depicted above will be used to directly change the processor name .

Statefull operations

Stateful operations like table(), join() are translated to multiple processors. The given name should be used to describe the first processor and as a prefix for all subsequent processors.

(i.e ForEachAction, ValueMapper, ValueMapperWithKey KeyValueMapper).

The method branch() results The method branch() results in multiple streams that can be described with the used of the new added method.However to be able to custome interface Predicate. However the name of the first translated processor we propose processor will not be customizable because this would require to overload this method with a one accepting a Described Named instance in argument.  This seems to be OK as the processor name would not determine any related topic / store names.

...

Statefull operations

The Sink operations returning Void can't use the method depicted above.

Currently, KStream<K, V> interface defines the following methods : 

...

Stateful operations like join(), leftjoin(), outerJoin() are translated to multiple processors. The given name should be used to describe the first processor and as a prefix for all subsequent processors. The interface ValueJoiner will be used to customize the processor names.


We propose to overload each of this method those methods to accept a Described argument.Named agument :

  • KStream/KTable#transform()

  • KStream/KTable#transformValues()

  • KStream#process()

Source and Sink operations

Then, for those  three methods we propose to add a new method withProcessorName() to classes Consumed, Produced and Printed in order to minimize the number of overloaded methods.

This will allow developers to specify a processor name for operations: 

  • <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)
  • <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> consumed)
  • void print(final Printed<K, V> printed)
  • void to(final String topic, final Produced<K, V> produced)
  • void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced)

...

Code Block
languagejava
/**
 * Create an instance of {@link Produced} with provided processor name.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @param <K>         key type
 * @param <V>         value type
 * @return a new instance of {@link Produced}
 */
public static <K, V> Produced<K, V> with(final String processorName) {
    return new Produced<>(null, null, null, null, processorName);
}

/**
 * Configure the instance of {@link Produced} with a key {@link Serde}.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @return this
 */
public Produced<K, V> withProcessorName(final String processorName) {
    this.processorName = processorName;
    return this;
}

Then we propose to add new methods to the existing classes : Consumed, Produced, Joined and Printed in order to be able to define custom processor names.


Proposed Changes

  • Implement the new class interface Named and updateall action interfaces
  • Overload methods processDescribed and update the KStream/KStreamImpl, KTable/KTableImpl methods that add stateless processor (i.e map(), mapValuetransform() , filterand transformValues() ...)for classes KStreams and KTables.
  • Update the Consumed, Produced and Joined Printed classes to be able to set a processor name.
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us 249.


Below is an application example : 

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


KStream<String, String> stream = builder.stream("topic-input");
        stream.as(Described.withName, Consumed.withProcessorName("STREAM-FROM-TOPIC-INPUT"))
        .filter(Predicate.named("FILTER-NULL-VALUE", (k, v) -> true ).as(Described.withName("FILTER-NULL-VALUE"))
        .map(KeyValueMapper.named("MAP-TO-UPPERCASE", (k, v) -> KeyValue.pair(k, v.toUpperCase())).as(Described.withName("MAP-TO-UPPERCASE"))
        .to("topic-output", Produced.with("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> KSTREAMTO-SINKOUTPUT-0000000003TOPIC
      <-- FILTER-NULL-VALUE
    Sink: KSTREAMTO-SINKOUTPUT-0000000003TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE

...

  1. The first proposition was to overload all stateless methods to accept an instance of Described class. However this solution was resulting in modiying a large percentage of the existing KStream and KTable methods.
  2. The second proposition was to add new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Void.