Current state: Under Discussion
JIRA: TBD (If Kafka Dev team decide to incorporate this feature)
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Based on KIP-22, New (Java) Kafka Producer has capability to plug-in custom logic to assign partition to given message per topic. However, the implementation of custom partitioning strategies depends on critical information such as # of partition change, online vs offline partitions. This information is available to producer internally via each metadata refresh interval (metadata.max.age.ms) or when certain type of error occurs within Network Client Code (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ) will attempt metadata refresh.
Hence, in order to make this interface reach in capabilities and functionality, Partitioner interface needs to be notified of following event:
1) When # of partition for given topic changes (typically one of the fundamental use case is adding more partitions to existing topic for horizontal scalability or processing topic faster)
One of the limitation of New Producer compare to Sacala Based Old Producer is that Old Producer did not impose any restriction on growing # of partition (by which I mean no impact to running producer in production) but New Producer does implicitly impose max limit before activating policy set by block.on.buffer.full (based on # of batch.size exceeds configured limit of buffer.memory). For example, you have New Producer running in production environment and you do not have inventory of configuration and you need to grow partition, but # of partition (# of batch.size ) grows beyond configured for running New Producer buffer.size, producer will drop or block.
Here is historic context on this subject and Kafka Dev Team Position on this issue:
Justification for this notification:
By using this notification, Custom Partitioner will not allow growing partition beyond max limit, ( or till degrade producer performance) and will not activate block.on.buffer.full.
Also, when a New Producer restarted (application restarts) with old configuration and # partitions does grow beyond its limit (set by buffer.memory), it can elect to choose random set of partitions which but restrict # would avoid above situation.
2) When partitions are not online (suppose both replica/leader broker is down for some partitions), so implementer of this Partitioner will have capability to make decision about how to redirect message others online partitions.
Another motivation for this event listener is to give operational capability to monitor change request from Producer point of view:
- Capability to Monitor Event From Producer
- Ability to implement feedback loop with Change Controller (e.g Kafka NOC/Dev Ops team) by which producer can acknowledge via change (e.g partition increase, decrease or auto change partition online/offline can be integrated) and change can be validated. Hence, provide end-end visibility to Kafka Dev Ops Team.
- Also consumer side can also benefits from this event notification to report or acknowledge that change has been propagated and accepted by the Consumer Group that is consuming topic.
Proposal is to add new on change method to existing Partitioner Interface:
Kafka Internal Change:
Kafka already has internal way of getting notified when Metadata Request Update via following internal listener.
Leverage existing implementation for metadata refresh notification, the MetadaChangeListener class will be notified upon refresh and intern it will do diff with previous instance of Cluster and notify change via onPartitionsChange() method as shown below.
please note the above implementation is pseudocode implementation.
Compatibility, Deprecation, and Migration Plan
1) Each of the Partitioner class also implement the Listener and each metadata refresh will call listener. This will not scale when you each one of the Patitioner will need to do diff between prev and old.