...
Code Block | ||
---|---|---|
| ||
RecordType[] recordTypes = RecordType.values(); if (recordTypes.length != 0) { BranchedKStream branched = stream. branch((k, v) -> v.getRecType() == recordTypes[0], Branched.withJavaConsumer(recordType::processRecords)); for (int i = 1; i < recordTypes.length; i++) branched.branch((k, v) -> v.getRecType() == recordTyperecordTypes[i], Branched.withJavaConsumer(recordType::processRecords)); } |
...