This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • KIP-418: A method-chaining way to branch KStream
Skip to end of metadata
Go to start of metadata

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-5488

Motivation

KStream.branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

  • In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
  • If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
  • If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.

Public Interfaces

The proposed new org.apache.kafka.streams.kstream.KafkaStreamsBrancher  classs introduces new standard way to build branches on top of KStream.

Instead of

KStream<String, String> source_o365_user_activity = builder.stream("source");
KStream<String, String>[] branches = source_o365_user_activity.branch(
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );

branches[0].to("A");
branches[1].to("B");
branches[2].to("C");


we could use

source
   .branch()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   .defaultBranch(ks->ks.to("C"));

Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `defaultBranch` methods.

This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:

@Setter
class CouponIssuer{
   private KStream<....> coffePurchases;
   private KStream<....> electronicsPurchases;

   KStream<...> coupons(){
       return coffePurchases.join(electronicsPurchases...)...
  }
}

CouponIssuer couponIssuer = new CouponIssuer();

transactionStream.branch()
     .branch(predicate1, couponIssuer::setCoffePurchases)
     .branch(predicate2, couponIssuer::setElectronicsPurchases);

KStream<..> coupons = couponIssuer.coupons();


Proposed Changes


Add the new KBranchedStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

Compatibility, Deprecation, and Migration Plan

The proposed change has no impact on existing code and is backwards compatible. All the old code that uses branch method will continue to work, we will just get the new way to perform branching.

Rejected Alternatives

Add KStreamsBrancher class that works the same way, but does not require KStream interface modification:

new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));
  • No labels