...
withChain(Function<KStream<K, V>, KStream<K, V>> chain)
-- a chain of operations with a given branch. By default, it is an s->s
identity function. Can be complex, as like s->s.mapValues....
, some composition of functions etc.withName(String name)
-- sets the name of the branch (auto-generated by default, when split
operation is named, then the names are index-suffixed).withConsumerwithJavaConsumer(Consumer<? super KStream<K, V>> consumer)
-- directs the final result of chain to a given consumerjava.util.function.Consumer. (The method is called withJavaConsumer in order not to confuse it with Kafka Consumer interface). By default, a NO-OP consumer is provided.
...
Code Block |
---|
|
source.split()
.branch((key, value) -> value.contains("A"), Branched.withConsumerwithJavaConsumer(ks->ks.to("A")))
.branch((key, value) -> value.contains("B"), Branched.withConsumerwithJavaConsumer(ks->ks.to("B")))
.defaultBranch(Branched.withConsumerwithJavaConsumer(ks->ks.to("C"))); |
More Complex Example: Merging Branches
Code Block |
---|
|
Map<String, KStream<String, String>> branches = source.split()
.branch((key, value) -> value == null, Branched.withName("null").withChain(s-s>s.mapValues(v->"NULL"))
.defaultBranch(Branched.withName("non-null"));
branches.get("non-null").merge(branches.get("null")); |
...
Code Block |
---|
|
KBranchedStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumerwithJavaConsumer(recordType::processRecords)); |
This is why 'starting' `splitsplit()
` operation is necessary and it is better to have it rather than add new `branch` method to `KStream` directly. Otherwise we should treat the first iteration separately, and the code for dynamic branching becomes cluttered:
Code Block |
---|
|
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length == 0) return;
KBranchedStream branched = stream.
branch((k, v) -> v.getRecType() == recordTypes[0],
Branched.withConsumer(recordType::processRecords));
for (int i = 1; i < recordTypes.length; i++)
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withJavaConsumer(recordType::processRecords)); |
Proposed Changes
Add the new KBranchedStream class and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).
...