Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-5488

Motivation

KStream.branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

...

1. The split(Named named) operation returns BranchedKStream BranchedKStream<K,V>. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.

...

2. BranchedKStream has the following methods:

  • BranchedKStream BranchedKStream<K,V> branch(Predicate<K,Predicate<? super K, ? super V> predicate, Branched<K,V> branched) -- creates a branch for messages that match the predicate and returns this in order to facilitate method chaining.
  • Map<String, KStream<K, KStream> V>> defaultBranch(Branched<K,V> branched) -- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.
  • Map<String, KStream> KStream<K,V>> noDefaultBranch() -- returns the dictionary of named KStreams.

...

3. Branched parameter extends NamedOperation and has the following builder static methods:

  • as(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends withChain(Function<KStream<K, V>, KStream<K, V>> chain) -- a chain of operations sets an operation with a given branch. By default, it is an s->s identity function. Can be complex, like s->s.mapValues...., some a composition of functions etc.
  • withName(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • withJavaConsumer(Consumer<? super KStream<K, V>> consumer) -- directs the final result of chain to a given java.util.function.Consumer. (The method is called withJavaConsumer in order not to confuse it with Kafka Consumer interface). By default, a NO-OP consumer is providedwith(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain, String name) -- sets both operation and a name.

The Map returned by defaultBranch/noDefaultBranch allows us to collect all the KStream branch objects in single scope. The branches collected are the results of transformations defined by `withChain` functions.

...