Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
KStream<String, String> source_o365_user_activity = builder.stream("source");
KStream<String, String>[] branches = source_o365_user_activity.branch(
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );

branches[0].to("A");
branches[1].to("B");
branches[2].to("C");


we could use


Code Block
languagejava
new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));

(onTopOf method returns the provided stream so we can continue with method chaining and do something more with the original stream)

Public Interfaces

Add the new org.apache.kafka.streams.kstream.KafkaStreamsBrancher class (see https://github.com/apache/kafka/pull/6164).

...