Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/l8ko353v3nn1blgymsty895x6c98oxlx

JIRA: KAFKA-17747 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848 introduces a next generation of consumer rebalance protocol which supports rack-aware partition assignment. In the first implement, the group coordinator computes subscribed topic metadata which contains topic UUID, name, number of partition, and rack set of each partition. When the metadata is expired, the group coordinator computes new subscribed topic metadata and compare with current one. If result is difference, the group coordinator bumps group epoch and calculates new target assignment which means it triggers a rebalance. However, the rack set of each partition takes too much space. In KAFKA-17578, a real case of memory usage shows that in a group with 500 members and 2K topic partitions,  partition racks account for 79% of whole ConsumerGroup object. This KIP will get rid of number of partition and rack set of each partition. It uses a subscribed topic hash to replace it. Each topic has a hash value. There are two server side conditions can trigger a new rebalance, so the topic hash needs to reflect them:

  • A topic with a new partition.
  • A topic partition has rack change. Each topic partition has multiple replicas. Each replica is stored on a broker. The rack value is from the broker.rack config. It's a read only config and can only be changed when broker restarts. If a broker stops, a related replica will be removed from topic partitions. If a broker starts, it will be added to topic partitions.

Following table compares cpu / memory / disk usage of different strategies:


CPUMemoryDisk

1. Subscription topic metadata with topic UUID, name, number of partition, and rack set of each partition.

LowHighHigh
2. Cache mechanism and subscription topic metadata with topic UUID, name, and hash.MidMidMid
3. A single hash to represent all subscribed topic per group.HighLowLow

If using the second strategy, it still uses lot of disk size to store redundant data. For example, a group subscribes some topics. Every time a subscribed topic change, the group needs to store a new subscription topic metadata map with a single entry changed. If using the third strategy, it wastes too much CPU resource to recalculate the hash when the metadata image is expired.

In this KIP, we will combine the second and third strategies. In coordinator, there is a cache to store each topic hash. When a topic change, only single topic hash will be recalculated. Different groups can get the topic hash from the cache, so the coordinator doesn't need to calculate a same topic hash multiple times. In __consumer_offsets, the coordinator sums all subscribed topic hash to a single hash per group and store to the disk, so it doesn't uses too much disk usage to represent the change.

Public Interfaces

ConsumerGroupPartitionMetadataKey / ConsumerGroupPartitionMetadataValue / ShareGroupPartitionMetadataKey / ShareGroupPartitionMetadataValue

These types will be deleted, because the coordinator doesn't store topic metadata.

ConsumerGroupMetadataValue

Add a new field "MetadataHash". Set it as tagged field for backward compatibility.

ConsumerGroupMetadataValue
{
  "type": "data",
  "name": "ConsumerGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "default": 0, "taggedVersions": "0+", "tag": 0,
      "about": "The hash of all topics in the group." } <-- new field
  ]
}

ShareGroupMetadataValue

Add a new field "MetadataHash".

ShareGroupMetadataValue
{
  "type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "about": "The hash of all topics in the group." } <-- new field
  ]
}

StreamsGroupMetadataValue

Add a new field "MetadataHash".

StreamsGroupMetadataValue
{
  "type": "data",
  "name": "StreamsGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "versions": "0+", "type": "int32",
      "about": "The group epoch." },
    { "name": "MetadataHash", "versions": "0+", "type": "int64",
      "about": "The hash of all topics in the group." } <-- new field
  ]
}


Proposed Changes

Topic Hash Map Cache in Coordinator

Different groups may subscribe to same topics. With topic hash map cache in the coordinator, it can avoid recalculation of same topic hash for different groups. The topic hash value should represent topic id, name, number of partitions, and partition racks.

Add a new topic hash in the cache

When the coordinator initializes, the topic hash map cache is empty. After receiving the first consumer group heartbeat, the coordinator calculates subscribed topic hash, so it doesn't waste memory to store unsubscribed topic hash.

Renew a topic hash in the cache

When there is a new metadata image, it contains changed topics and deleted topics in metadata delta. For both cases, the coordinator remove topic hash from the cache. In the next consumer group heartbeat, the value will be recomputed and stored to the cache. The lazy evaluation is more efficient, because it guarantees that the coordinator only calculate topic hash which is in use.

Example: a new partition in a topic

When a new partition is added to a topic, the coordinator receives a new MetadataDelta. It contains the topic in TopicsDelta#changedTopics. The coordinator will clean up the topic hash. In the next group heartbeat, the coordinator detects the topic hash is empty and recalculate it.

Example: the racks of a partition change

There are two cases about rack change. One is users alter partition reassignment. Another is "broker.rack" configuration change. The configuration is read only and needs to be changed after the broker reboots. For both cases, if the replica is ISR on a reassigned/rebooted broker, the change is reflected in PartitionRegistration#isr. This is also included in TopicDelta#partitionChanges. The renew process is similar to above. A new MetadataDelta contains the topic in TopicsDelta#changedTopics and the coordinator remove the topic hash after it receives the change. In the next group heartbeat, the coordinator re-compute the value.

The PartitionRegistration also contains other changes like leader / elr. Every change will make the topic hash recomputation. It's no harm, because the hash only reflects change about uuid / name / number of partition / racks of partitions. If non of these fields change, the final hash will be the same, so it doesn't bump the group epoch or trigger a rebalance.

All subscribed topic hash in Group

The group doesn't need to store subscribed topic metadata, because the coordinator doesn't need the value to detect a rebalance. After this KIP, the group gets all subscribed topic hash from the cache in coordinator and sums them as a single hash. The final hash will be stored with a new group epoch in ConsumerGroupMetadataValue / ShareGroupMetadataValue.

Topic Hash Function

A topic hash represents topic id, name, number of partitions, and partition racks. To avoid useless rebalance, the hash function should return same value for same data, even if it runs on different JDKs or partition racks have different order. For different JDKs, the KIP will use Murmur3 to compute the hash value. For partition racks with different order, we will compute hash for each value in it and sum as a result. We will also set the first byte as magic byte to represent hash version.

public static long topicHash(String topicName, int numPartitions, Map<Integer, List<Integer>> partitionIdToSortedRacks)


SubscribedTopicDescriberImpl

The group doesn't store TopicMetadata, so the subscribedTopicDescriberImpl can't rely on it to return numPartitions and racksForPartition. This KIP changes the constructor from Map<Uuid, TopicMetadata> to Set<Uuid> and MetadataImage, so it can use Set<Uuid> to check which topic is subscribed and use MetadataImage to return numPartitions and racksForPartition.

Compatibility, Deprecation, and Migration Plan

ConsumerGroupPartitionMetadataValue / ShareGroupPartitionMetadataValue

There two records will be deprecated. For compatibility, we should keep GroupMetadataManager#replay functions, so the coordinator can read old data.

Upgrade

When a coordinator upgrades, it initials with old records like following. It's no harm for the coordinator to read ConsumerGroupPartitionMetadataValue, because the coordinator still keeps the replay function. When it reads the record, it creates a group if the value is not null or does nothing if value is null. New logic will try read MetadataHash field. Although there is no MetadataHash field in the old ConsumerGroupMetadataValue record, it's tagged field with default value 0, so the coordinator sets a group with an initial value 0. When the coordinator receives the first heartbeat, it calculates a new MetadataHash and finds the value is different. This triggers a group epoch bump without a rebalance, because assignors are sticky. The topology of the group will not change.

ConsumerGroupMemberMetadataValue
ConsumerGroupPartitionMetadataValue
ConsumerGroupMetadataValue (no MetadataHash field)
ConsumerGroupTargetAssignmentMemberValue
ConsumerGroupTargetAssignmentMetadataValue
ConsumerGroupCurrentMemberAssignmentValue

Downgrade

When a coordinator downgrades, it initials with new records like following. The coordinator doesn't have logic to handle MetadataHash, so the field is ignored. It initials a group without subscription metadata. When the coordinator receives the first heartbeat, it calculates a new subscription metadata and finds the value is different. This triggers a group epoch bump without a rebalance, because assignors are sticky. The topology of the group will not change.

ConsumerGroupMemberMetadataValue
ConsumerGroupMetadataValue (with MetadataHash tagged field)
ConsumerGroupTargetAssignmentMemberValue
ConsumerGroupTargetAssignmentMetadataValue
ConsumerGroupCurrentMemberAssignmentValue

Test Plan

  • Unit test for the topic hash function. The hash function should ignore partition racks order and give a same value if the set is no difference.
  • Unit test for GroupMetadataManager. Test only topic id, name, number of partition, and partition racks change can make a rebalance. Other data change should keep same assignment.
  • Integration test between coordinator and consumer. Use admin client to update topic partition and check consumer group get a new assignment. Restart a broker to change rack and check consumer group get a new assignment. Use a consumer group to subscribe regex pattern and use admin client to add a new pattern matched topic, check the consumer group get a new assignment.

Rejected Alternatives

Check Topic Delta to Trigger a Rebalance

Check the changed part like TopicDelta#newPartitons when receiving a new metadata image. If there is change data, set a value in the group and trigger a rebalance in the next group heartbeat. This approach can reduce hash calculation and storage. However, the group coordinator is not always online. If the topic change is not happened with online coordinator, the change will be ignored and the coordinator can't trigger a rebalance.

Add Epoch to TopicImage

A new metadata image is computed by controller. An epoch can represent the version of TopicImage. If there is number of partitions or partition racks change, the controller bumps the epoch. When coordinator receives a new metadata image and there is a new topic epoch, it triggers a rebalance for a group. The group also can store topic epoch map in records, so it can know the difference if the coordinator restarts. This approach save cpu and storage resources and avoid the downside of "Check Topic Delta to Trigger a Rebalance". However, this approach mixes the group coordinator logic within the controller.


  • No labels