Current state: Rejected
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
StreamPartitionAssignor, assignment takes place without regards to which tasks are stateful and which are not. Thus potentially creating a situation where we have an unbalanced assignment of tasks. For
StreamPartitionAssignor to factor in stateful tasks, the current rebalance protocol's metadata should be upgraded so that it contains the number of state stores that exists for a certain task. In this manner, when metadata is decoded in
StreamPartitionAssignor#assign(Cluster metadata, Map<String, Subscription> subscription), the data for stateful tasks are now available for more even distibution to clients.
The class in which the rebalance protocol's metadata is contained is
SubscriptionInfo which encodes information into a
ByteBuffer. What this upgrade would require is to change the format of the encoding and decoding to the
SubscriptionInfo, which would require us to give an upgrade strategy for older Kafka versions. A new encoding and decoding format will be introduced: mainly the addition of an extra integer for each
The following changes will be made to
TaskId to facilitate the recording of the number of state stores for a specific task:
In this manner, when
TaskId#decode() is called in
SubscriptionInfo, the number of state stores will also be processed. Note that
setNumberOfStateStores(...) will also be called during
StreamTask instantion to set the number of state stores to the given
ProcessorTopology's state store list size.
It would also be helpful to know that in the older version of encoding
TaskId, only the topic group id and partition number was included. In this newer version, the number of state stores is also made available.
Compatibility, Deprecation, and Migration Plan
Since we are updating metadata, this will undoubtedly necessitate an upgrade path which has been provided in KIP-268.