Current state: Accepted, targeting 3.7
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Each key changing operation in Kafka Streams (
transform, etc.) now leads to automatic repartition before a stateful aggregation. However, repartition is not always necessary if the input stream is already correctly partitioned. In these cases, the automatic repartition brings in additional overhead. As an example, if an input stream comes in partitioned by key1, calling the function
selectKey( ... => (key1, metric)).groupByKey will trigger a repartition today.
In tickets KAFKA-4835 and KAFKA-10844 the option for canceling the unneeded repartition is requested. Repartition canceling is also needed for the efficient usage of
distinct() operators proposed in KIP-655:
groupBy(...).windowedBy(...).distinct() will always repartition by default, while this is not always needed in practice.
This KIP proposes a new interface for users to optimize the key changing operations (
transform, etc.) in Kafka Streams.
In accordance with KStreams DSL Grammar, we introduce a new parameterless
DSLOperation markAsPartitioned() on
Calling the new DSLOperation will return a new, mutated KStream. The new instance will not repartition as downstream operations are chained onto it. Whereas the original stream keeps its own internal property to operate in the default way.
Example: canceling repartition in a streams aggregation would look like:
Example: fan out from the same stream:
- The usage of this operation complicates the usage of IQ(Interactive Query) and joins. For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. However, when the repartitions are canceled, records stayed in their original partition by their original key. IQ assumes and uses the composite key instead of the original key. That's when IQ can break downstream. The same applies to joins.
- In the documentation, we specifically advise against using the interface with IQ or joins.
- However, a potential solution to support IQ is to provide a 'reverse mapping' for the composite key that restores the original key, which can then be used for obtaining the metadata. We can follow up with a change when there is request.
Compatibility, Deprecation, and Migration Plan
- No impact on existing users, no migration is needed.
Option 1: Composite Key
If we don't want to introduce an unsafe operation, we might discuss introducing composite keys as an alternative.
CompositeKey<H, P>consists of a head and a postfix, and the partition of a composite key is always defined by its 'head' only.
CompositeKey(k, v)must have the same partition for each
- We will need to introduce
selectCompositeKeyoperations that will not lead to repartition.
CompositeKey the design will be safe both from the pov of data locality and IQ but adds complexity to the usage.
Option 2: Optional configuration in Named Operations( Joined , Grouped , etc)
- It would allow us to hit only the relevant parts of the DSL and be less prone to undesired behaviors when it comes to IQ or joins.
- More generic, can be applied to KTable as well. In comparison, the
markAsPartitioned()approach is targeting the KStreams interface only where it focuses on a specific set of overhead/pain points introduced by
- It touches on a larger surface area of the API.