...
Code Block | ||
---|---|---|
| ||
final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
private final int generationId // new added
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions, int generationId) {
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
this.generationId = generationId; // new added
}
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
this(topics, userData, Collections.emptyList(), -1);
}
public Subscription(List<String> topics, ByteBuffer userData) {
this(topics, userData, Collections.emptyList(), -1);
}
public Subscription(List<String> topics) {
this(topics, null, Collections.emptyList(), -1);
}
// new added, the generationId getter
public int generationId() {
return generationId;
}
} |
Proposed Changes
So, during the joinGroup request, all consumers will include the "generation" data into the protocol set and send to the group coordinator. Later, when the consumer lead receive the subscription info from all consumers, it'll do the assignment based on the "ownedPartitions" and "generation" info. Also, after the assignment, we can also leverage the "ownedPartitions" and "generation" info to validate the assignments.
...