...
This generation is not currently exposed to any of partition assignors. It will be exposed as part of the assignor.assignonAssignment(...)
call in ConsumerCoordinator.java
:
Code Block | ||||
---|---|---|---|---|
| ||||
// current code ... Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptionsonAssignment(assignment); ... // revised code ... Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions, latestGeneration().generationId); ... |
...
onAssignment(assignment, generation); ... |
As a result of above change to the assignor.assignonAssignment(...)
call, a new method is introduced in the interface PartitionAssignor
:
Code Block | ||||
---|---|---|---|---|
| ||||
default Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions, void onAssignment(Assignment assignment, int generation) { return assign(partitionsPerTopic, subscriptionsonAssignment(assignment); } |
The existing assignonAssignment(...)
method will remain to support classes that already implement this interface.
It is expected that classes that implement this interface implement at least one of the two assign(...)
methods. A default implementation (in the interface class) can be is provided for both methods to allow this, and to keep this methods to keep assignors that do not make use of group generation (e.g. RangeAssignor
and RoundRobinAssignor
classes) intact.
Note: During the implementation of this KIP there was a back and forth on whether this generation argument should be an optional argument (i.e. Optional<Integer>
). It turned out that the only edge case where the optional argument would be useful is when an old client makes use of the updated sticky assignor. It was decided that this supposedly rare case would not worth the complexity and the noise that comes with the optional type.
Compatibility, Deprecation, and Migration Plan
...