Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/l8ko353v3nn1blgymsty895x6c98oxlx

Vote thread: https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p

JIRA: KAFKA-17747 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848 introduces a next-generation consumer rebalance protocol that supports rack-aware partition assignment. In its initial implementation, the group coordinator computes subscribed topic metadata, which includes the topic UUID, name, number of partitions, and the rack set for each partition. When this metadata expires, the group coordinator recalculates the subscribed topic metadata and compares it to the current version. If there are differences, the group coordinator increments the group epoch and generates a new target assignment, effectively triggering a rebalance. However, the rack set for each partition consumes significant memory. In KAFKA-17578, a real-world case shows that for a group with 500 members and 2,000 topic partitions, partition rack data accounts for 79% of the total memory used by the ConsumerGroup object. The rack-aware rebalance was removed in 4.0. This KIP introduces another way to support the feature and targets at 4.1

This KIP proposes removing the number of partitions and rack set details from the metadata and replacing them with a subscribed topic hash. Each topic is assigned a unique hash value. The topic hash must account for two server-side conditions that can trigger a rebalance:

  • A topic with a new partition.
  • A topic partition undergoes a rack change. Each topic partition has multiple replicas. The rack value is derived from the broker's broker.rack configuration, which is read-only and only changes when the broker restarts. If a broker stops, its associated replica is removed from the topic partitions. Conversely, when a broker starts, its replica is added to the topic partitions.

Following table compares cpu / memory / disk usage of different strategies:


CPUMemoryDisk

1. Subscription topic metadata with topic UUID, name, number of partition, and rack set of each partition.

LowHighHigh
2. Cache mechanism and subscription topic metadata with topic UUID, name, and hash.MidMidMid
3. A single hash to represent all subscribed topic per group.HighLowLow

The second strategy still requires significant disk space to store redundant data. It stores a map of each subscribed topic hash. Even if only a single entry has changed, it results in storing a new map of topic hash. On the other hand, the third strategy heavily consumes CPU resources to recalculate the hash whenever a topic changes.

This KIP proposes combining the second and third strategies. The coordinator will maintain a cache to store individual topic hashes. When a topic changes, only its hash will be recalculated. Different groups can retrieve the topic hash from the cache, eliminating the need for the coordinator to recalculate the same hash multiple times. In the record, the coordinator will aggregate all subscribed topic hashes into a single hash per group and store it on disk. This approach minimizes disk usage while efficiently handling changes.

Public Interfaces

ConsumerGroupPartitionMetadataKey / ConsumerGroupPartitionMetadataValue

These types will be deprecated for backward compatibility.

StreamsGroupPartitionMetadataKey / StreamsGroupPartitionMetadataValue / ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue

These types will be deleted, because Stream / Share group coordinator has not been GA before 4.1.

ConsumerGroupMetadataValue

Add a new field "MetadataHash". Set it as tagged field for backward compatibility.

ConsumerGroupMetadataValue
{
  "type": "data",
  "name": "ConsumerGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "default": 0, "taggedVersions": "0+", "tag": 0,
      "about": "The hash of all topics in the group." } <-- new field
  ]
}

ShareGroupMetadataValue

Add a new field "MetadataHash".

ShareGroupMetadataValue
{
  "type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "about": "The hash of all topics in the group." } <-- new field
  ]
}

StreamsGroupMetadataValue

Add a new field "MetadataHash".

StreamsGroupMetadataValue
{
  "type": "data",
  "name": "StreamsGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "about": "The hash of all topics in the group." } <-- new field
  ]
}


Proposed Changes

Topic Hash Map Cache in Coordinator

Different groups may subscribe to same topics. With topic hash map cache in the coordinator, it can avoid recalculation of same topic hash for different groups. The topic hash value should represent topic id, name, number of partitions, and partition racks.

Add a new topic hash in the cache

When the coordinator initializes, the topic hash map cache is empty. After receiving the first consumer group heartbeat, the coordinator calculates subscribed topic hash, so it doesn't waste memory to store unsubscribed topic hash.

Renew a topic hash in the cache

When there is a new metadata image, it contains changed topics and deleted topics in metadata delta. For both cases, the coordinator remove topic hash from the cache. In the next consumer group heartbeat, the value will be recomputed and stored to the cache. The lazy evaluation is more efficient, because it guarantees that the coordinator only calculate topic hash which is in use.

Example: a new partition in a topic

When a new partition is added to a topic, the coordinator receives a new MetadataDelta. It contains the topic in TopicsDelta#changedTopics. The coordinator will clean up the topic hash. In the next group heartbeat, the coordinator detects the topic hash is empty and recalculate it.

Example: the racks of a partition change

There are two cases about rack change. One is users alter partition reassignment. Another is "broker.rack" configuration change. The configuration is read only and needs to be changed after the broker reboots. For both cases, if the replica is ISR on a reassigned/rebooted broker, the change is reflected in PartitionRegistration#isr. This is also included in TopicDelta#partitionChanges. The renew process is similar to above. A new MetadataDelta contains the topic in TopicsDelta#changedTopics and the coordinator remove the topic hash after it receives the change. In the next group heartbeat, the coordinator re-compute the value.

The PartitionRegistration also contains other changes like leader / elr. Every change will make the topic hash recomputation. It's no harm, because the hash only reflects change about uuid / name / number of partition / racks of partitions. If non of these fields change, the final hash will be the same, so it doesn't bump the group epoch or trigger a rebalance.

Remove a topic hash in the cache

In the coordinator, it maintains each topic is subscribed by which groups. When a group unsubscribes, the coordinator checks whether a topic is subscribed by any group. If it's not, the topic hash is removed from the cache.

A single hash of all subscribed topic in Group

In this KIP, the group gets all subscribed topic hash from the cache in coordinator and sums them as a single hash. The final hash will be stored with a new group epoch in ConsumerGroupMetadataValue / ShareGroupMetadataValue / StreamGroupMetadataValue. A simple sum of each topic hash may have collision For example, topicA gets hash value "a" and topicB gets "b". After some operations (add partition / rack change), topicA gets hash value "b" and topicB gets "a". In this case, a simple sum cannot trigger a rebalance. To ensure an avalanche effect, the combined hash function should sort topics by name and each hash value multiplied by the position value.

hash(topicA)hash(topicB)Final Hash
abhash(a + 2 * b)
bahash(b + 2 * a)

Another case is about regular expression. For example, a consumer subscribe regular expression like "topic*". At T1, matched topics are topicA with hash value "a" and topicB with hash value "b". At T2, topicA and topicB are removed. The topicC and topicD are added and have same hash value "a" and "b". This case is not covered by this KIP because the group hash value cannot trigger the rebalance. In 4.0, there is another function executes regular expression and check whether topic result is different. If yes, it bumps the group epoch and calculates new assignment, so this KIP doesn't need to handle this case.

Topic Hash Function

A topic hash represents topic id, name, number of partitions, and partition racks. To avoid useless rebalance, the hash function should return same value for same data, even if it runs on different JDKs or partition racks have different order. For different JDKs, the KIP will use Murmur3 to compute the hash value. For partition racks with different order, we will compute hash for each value in it and sum as a result. We will also set the first byte as magic byte to represent hash version. The Murmur3 uses bit operation to ensure avalanche effect. A single bit change can get a different hash value. The hash function should follow the order to combine different fields:

  • magic byte (byte)
  • topic id (long)
  • topic name (string)
  • partition size (long)
  • sorted partition by id
    • partition id (int)
    • sorted racks (string)

SubscribedTopicDescriberImpl

The group doesn't store TopicMetadata, so the subscribedTopicDescriberImpl can't rely on it to return numPartitions and racksForPartition. This KIP changes the constructor from Map<Uuid, TopicMetadata> to Set<Uuid> and MetadataImage, so it can use Set<Uuid> to check which topic is subscribed and use MetadataImage to return numPartitions and racksForPartition.

Compatibility, Deprecation, and Migration Plan

ConsumerGroupPartitionMetadataValue

This record will be deprecated. For compatibility, we should keep the replay function, so the coordinator can read old data.

Upgrade

When a coordinator upgrades, it initials with old records like following. It's no harm for the coordinator to read ConsumerGroupPartitionMetadataValue, because the coordinator still keeps the replay function. When it reads the record, it creates a group if the value is not null or does nothing if value is null. New logic will try read MetadataHash field. Although there is no MetadataHash field in the old ConsumerGroupMetadataValue record, it's tagged field with default value 0, so the coordinator sets a group with an initial value 0. When the coordinator receives the first heartbeat, it calculates a new MetadataHash and triggers a group epoch bump without a rebalance, because assignors are sticky. The topology of the group will not change. At the first time, the coordinator writes a tombstone ConsumerGroupPartitionMetadataValue, so old data can be removed after log compaction.

ConsumerGroupMemberMetadataValue
ConsumerGroupPartitionMetadataValue
ConsumerGroupMetadataValue (no MetadataHash field)
ConsumerGroupTargetAssignmentMemberValue
ConsumerGroupTargetAssignmentMetadataValue
ConsumerGroupCurrentMemberAssignmentValue

Downgrade

When a coordinator downgrades, it initials with new records like following. The coordinator doesn't have logic to handle MetadataHash, so the field is ignored. It initials a group without subscription metadata. When the coordinator receives the first heartbeat, it calculates a new subscription metadata and finds the value is different. This triggers a group epoch bump without a rebalance, because assignors are sticky. The topology of the group will not change.

ConsumerGroupMemberMetadataValue
ConsumerGroupMetadataValue (with MetadataHash tagged field)
ConsumerGroupTargetAssignmentMemberValue
ConsumerGroupTargetAssignmentMetadataValue
ConsumerGroupCurrentMemberAssignmentValue

Test Plan

  • Unit test for the topic hash function. The hash function should ignore partition racks order and give a same value if the set is no difference.
  • Unit test for GroupMetadataManager. Test only topic id, name, number of partition, and partition racks change can make a rebalance. Other data change should keep same assignment.
  • Integration test between coordinator and consumer. Use admin client to update topic partition and check consumer group get a new assignment. Restart a broker to change rack and check consumer group get a new assignment. Use a consumer group to subscribe regex pattern and use admin client to add a new pattern matched topic, check the consumer group get a new assignment.
  • Extend ConsumerGroupHeartbeatRequestTest to cover new feature: add a new partition or change rack can trigger a rebalance.

Rejected Alternatives

Check Topic Delta to Trigger a Rebalance

Check the changed part like TopicDelta#newPartitons when receiving a new metadata image. If there is change data, set a value in the group and trigger a rebalance in the next group heartbeat. This approach can reduce hash calculation and storage. However, the group coordinator is not always online. If the topic change is not happened with online coordinator, the change will be ignored and the coordinator can't trigger a rebalance.

Add Epoch to TopicImage

A new metadata image is computed by controller. An epoch can represent the version of TopicImage. If there is number of partitions or partition racks change, the controller bumps the epoch. When coordinator receives a new metadata image and there is a new topic epoch, it triggers a rebalance for a group. The group also can store topic epoch map in records, so it can know the difference if the coordinator restarts. This approach save cpu and storage resources and avoid the downside of "Check Topic Delta to Trigger a Rebalance". However, this approach mixes the group coordinator logic within the controller.

Bump Group Version

Increase the group.version feature level to 2 to enable rack-aware rebalancing. However, there are two drawbacks to implementing this:

  1. It requires maintaining lot of if-else block to determine whether to use subscription metadata or group hash.
  2. When changing the group.version to 2, it triggers an epoch bump for all groups.



  • No labels