Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Motivation
The KStream
API provides merge(KStream
) to merge another KStream
with this. Sometimes, it may be useful to merge more than 2 KStreams
together. Currently, the best way to do this is using Java's Stream.reduce
:
List<KStream<K, V>> streams ...; streams.stream().reduce((left, right) -> left.merge(right));
This creates a merge node in the process graph for every KStream
in the collection being merged.
Complex process graphs can make understanding an application and debugging more difficult.
Public Interfaces
Two new methods will be added to KStream
:
KStream<K, V> merge(Collection<KStream<K, V>> streams);
KStream<K, V> merge(Collection<K, V> streams, Named named);
Two static methods will also be added to KStream:
static <K, V> KStream<K, V> merged(Collection<KStream<K, V>> streams);
static <K, V> KStream<K, V> merged(Collection<KStream<K, V>> streams, Named named);
These are utility methods for merging a collection of KStreams when the user doesn't already have a KStream to invoke merge
upon.
Proposed Changes
KStream
will be updated with the above API, and KStreamImpl
will have its implementation updated to match.
As with the existing implementation, if any of the input KStreams
need to be repartitioned, the entire merged KStream
will be repartitioned. If users only wish to repartition the sub-pairs of KStreams
that need to be, they can fall-back to the previous strategy of iteratively merging pairs of KStream
down using Stream#reduce
The KStream
that merge is called on is treated no differently in the merge than the KStreams
in the Collection
being merged. This is the same behaviour we have today, where the order of the KStreams
being merged has no effect.
Compatibility, Deprecation, and Migration Plan
- Due to ambiguity in the type, passing the
null
literal as the first parameter tomerge
will no longer be possible without explicitly casting to eitherKStream<K, V>
orCollection<KStream<K, V>>
first. Since passingnull
for this argument will always produce a runtime error anyway, passing anull
literal was never intended in the original API. - No other modifications, deprecations or removals are being made to the existing API.