Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length != 0) {
  BranchedKStream branched = stream.
    branch((k, v) -> v.getRecType() == recordTypes[0],
       Branched.withJavaConsumer(recordType::processRecords));

  for (int i = 1; i < recordTypes.length; i++) 
    branched.branch((k, v) -> v.getRecType() == recordTypes[i],
      Branched.withJavaConsumer(recordType::processRecords));
}

Proposed Changes


  1. Add the following methods to KStream:
Code Block
languagejava
BranchedKStream<K,V> split();
BranchedKStream<K,V> split(Named n);

2. Deprecate the existing branch method.

3. Add and implement the following Branched class:

Code Block
languagejava
class Branched<K, V> implements Named<Branched<K,V>> {
    Branched<K, V> withName(String name);
    Branched<K, V> withChain(Function<? super KStream<K, V>, ? extends KStream<K, V>> chain);
    Branched<K, V> withJavaConsumer(Consumer<? super KStream<K, V>> consumer);
}

Add and implement the following BranchedKStream interface:

Code Block
languagejava
interface BranchedKStream<K, V> {
    BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
    BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);
    Map<String, KStream<K, V>> defaultBranch(Branched<K, V> branched);
    Map<String, KStream<K, V>> defaultBranch();
    Map<String, KStream<K, V>> noDefaultBranch();
}


(See new BranchedKStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512for a very rough draft).

Compatibility, Deprecation, and Migration Plan

...