Status
Current state: Accepted
Discussion thread: here
JIRA: KAFKA-5765
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Merging multiple KStreams is done via StreamsBuilder's merge()
(formally KStreamsBuilder's merge()
). This is quite unnatural and should be done via KStream's merge()
.
Public Interfaces
We will add a merge()
method to the KStream interface.
KStream<K,V> merge(KStream<K,V> stream);
Proposed Changes
Originally, the method merge()
was defined in StreamsBuilder. Through a series of calls, it called a merge()
method in KStreamImpl seperate from the one we are adding.
static <K,V> KStream<K,V> merge(InternalStreamsBuilder builder, KStream<K,V>[] streams)
In the new structure, we will add a method override with merge()
:
@Override
public KStream<K,V> merge(KStream<K,V> stream)
{
return this.builder.merge(stream); //builder (type:InternalStreamsBuilder) was defined in the AbstractStream superclass
}
We will no longer use several variable arguments in one call, rather there will only be one stream that is taken as an input argument.
Compatibility, Deprecation, and Migration Plan
We will remove the old merge()
method inside StreamsBuilder. Classes (or systems) calling the old, deprecated method will have to switch to calling the new one
in KStreamImpl.
Rejected Alternatives
There was plans to use the merge()
method of the following form:
<K,V> KStream<K,V> merge(StreamsBuilder builder, KStream<K,V> ... streams)
However, the builder parameter is extra and unnecesary, as the local InternalStreamsBuilder is sufficient.