Current state: Accepted
Discussion thread: here
Voting thread: here
Pull request: PR-9107
KStream#branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').
This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.
- In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
- If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
- If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.
In accordance with KStreams DSL Grammar, we introduce the following new elements:
BranchedKStreamDSLObject with following DSLOperations:
split(Named named) operation returns
BranchedKStream<K,V>. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.
The overloaded parameterless alternative
split() is also available.
BranchedKStream has the following methods:
BranchedKStream<K,V> branch(Predicate<? super K, ? super V> predicate, Branched<K,V> branched)-- creates a branch for messages that match the predicate and returns
thisin order to facilitate method chaining.
Map<String, KStream<K,V>> defaultBranch(Branched<K,V> branched)-- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.
Map<String, KStream<K,V>> noDefaultBranch()-- returns the dictionary of named KStreams.
defaultBranch operations also have overloaded alternatives without the
Branched parameter extends
NamedOperation and has the following static methods:
as(String name)-- sets the name of the branch (auto-generated by default, when
splitoperation is named, then the names are index-suffixed).
withFunction(Function<? super KStream<K, V>, ? extends KStream<K, V>> chain)— sets an operation with a given branch. By default, it is an
s->sidentity function. Can be complex, like
s->s.mapValues...., a composition of functions etc.
withConsumer(Consumer<? super KStream<K, V>> chain)— sets a consumer for a given branch.
withFunction(Function<? super KStream<K, V>, ? extends KStream<K, V>> chain, String name)— sets both an operation and a name.
withConsumer(Consumer<? super KStream<K, V>> chain, String name)— sets both a consumer and a name.
The Map returned by
noDefaultBranch allows us to collect all the KStream branch objects in a single scope.
How the resulting Map is formed
The keys of the Map entries are defined by the following rules:
Namedparameter was provided for
split, its value is used as a prefix for each key. By default, no prefix is used
- If a
nameis provided for the
branch, its value is appended to the prefix to form the Map key
- If a
nameis not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1"
- If a
nameis not provided for the
defaultBranchcall, then the key defaults to prefix + "0"
The values of the Map entries are formed as following:
- If no chain function or consumer is provided, then the value is the branch itself (which is equivalent to
ks→ksidentity chain function)
- If a chain function is provided and returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
nullfor a given branch, then the respective entry is not put to the map
- If a consumer is provided for a given branch, then the the respective entry is not put to the map
The following section demonstrates some standard use cases for the proposed API
Simple Example: Direct Branch Consuming
In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently from others. Then we can use 'consuming' lambdas or method references in
More Complex Example: Merging Branches
In other cases we want to combine branches again after splitting. The map returned by
noDefaultBranch methods provides access to the branches in the same scope:
There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:
This is why 'starting'
split() operation is necessary and it is better to have it rather than add new
branch method to
Otherwise we should treat the first iteration separately, and the code for dynamic branching becomes cluttered:
- Add the following methods to
2. Deprecate the existing
3. Add and implement the following Branched class:
Add and implement the following BranchedKStream interface:
(See https://github.com/apache/kafka/pull/6512 for a very rough draft).
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible.
The old KStreams#branch method should be deprecated.
- A KStreamsBrancher class that works the same way, but does not require KStream interface modification:
Rejected because of violation of method-chaining (new auxiliary object is needed).
Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `defaultBranch` methods. This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:
This was rejected because of the difficulty of having branches in the same scope.