...
- A member joins or leaves the group.
- A member updates its subscriptions.
- A member is removed from the group by the group coordinator.
- The partition metadata is updated, such as when a new partition is added or a topic is created or deleted, or when the rack ID changes.
AdminClient.alterShareGroupOffsets
is used to set the SPSO.
In all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. The new version of the group metadata signals that a new assignment is required for the group.
...
There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.
A table containing the ACLs for the new RPCs can be found in the section on changes to the Kafka protocol below.
Managing durable share-partition state
...
Type | Key | Value | ||||
---|---|---|---|---|---|---|
ShareSnapshot |
|
| ||||
ShareUpdate |
|
|
...
Operation | State changes | Cumulative state | WriteShareGroupState request | ||
---|---|---|---|---|---|
Starting state of topic-partition with latest offset 100 | SPSO=100, SPEO=100 | SPSO=100, SPEO=100 |
| ||
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards | |||||
Fetch records 100-109 | SPEO=110, records 100-109 (acquired, delivery count 1) | SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1) | |||
Acknowledge 100-109 | SPSO=110 | SPSO=110, SPEO=110 |
| ||
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | |||||
Fetch records 110-119 Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119 | SPEO=120, records 110-119 (acquired, delivery count 1) | SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1) | |||
Release 110 (consumer 1) | record 110 (available, delivery count 1) | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
| ||
Acknowledge 119 (consumer 3) | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
| ||
Fetch records 110, 120 (consumer 1) | SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) | |||
Lock timeout elapsed 111, 112 (consumer 1's records) | records 111-112 (available, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 113-118 (consumer 2) | records 113-118 acknowledged | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Fetch records 111, 112 (consumer 3) | records 111-112 (acquired, delivery count 2) | SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) | |||
Acknowledge 110 (consumer 1) | SPSO=111 | SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 111, 112 (consumer 3) | SPSO=120 | SPSO=120, SPEO=121, record 120 (acquired, delivery count 1) |
|
...
Configuration | Description | Values |
---|---|---|
group.share.enable | Whether to enable share groups on the broker. | Default false while the feature is being developed. Will become true in a future releaseThis is an internal configuration and will be removed once this feature is complete. |
group.coordinator.rebalance.protocols | The list of enabled rebalance protocols. (Existing configuration) | "share" will be added to the default value of this configuration once this feature is complete. |
group.share.delivery.count.limit | The maximum number of delivery attempts for a record delivered to a share group. | Default 5, minimum 2, maximum 10 |
group.share.record.lock.duration.ms | Share-group record acquisition lock duration in milliseconds. | Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds) |
group.share.record.lock.duration.max.ms | Share-group record acquisition lock maximum duration in milliseconds. | Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour) |
group.share.record.lock.partition.limit | Share-group record lock limit per share-partition. | Default 200, minimum 100, maximum 10000 |
group.share.session.timeout.ms | The timeout to detect client failures when using the group protocol. | Default 45000 (45 seconds) |
group.share.min.session.timeout.ms | The minimum session timeout. | Default 45000 (45 seconds) |
group.share.max.session.timeout.ms | The maximum session timeout. | Default 60000 (60 seconds) |
group.share.heartbeat.interval.ms | The heartbeat interval given to the members. | Default 5000 (5 seconds) |
group.share.min.heartbeat.interval.ms | The minimum heartbeat interval. | Default 5000 (5 seconds) |
group.share.max.heartbeat.interval.ms | The maximum heartbeat interval. | Default 15000 (15 seconds) |
group.share.max.groups | The maximum number of share groups. | Default 10, minimum 1, maximum 100 |
group.share.max.size | The maximum number of consumers that a single share group can accommodate. | Default 200, minimum 10, maximum 1000 |
group.share.assignors | The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used. | A list of class names. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor" |
group.share.state.topic.num.partitions | The number of partitions for the share-group state topic (should not change after deployment). | Default 50 |
group.share.state.topic.replication.factor | The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement. | Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
group.share.state.topic.segment.bytes | The log segment size for the share-group state topic. | Default 104857600 |
group.share.state.topic.min.isr | Overridden min.insync.replicas for the share-group state topic. | Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.threads | The number of threads used by the share coordinator. | Default 1, minimum 1 |
...
auto.offset.reset
: this is handled by a dynamic group configurationgroup.share.auto.offset.reset
enable.auto.commit
andauto.commit.interval.ms
: share groups do not support auto-commitgroup.instance.id
: this concept is not supported by share groupsisolation.level
: this is handled by a dynamic group configurationgroup.share.isolation.level
partition.assignment.strategy
: share groups do not support client-side partition assignorsinterceptor.classes
: interceptors are not supportedprotocol.type
: this configuration is used to select the group protocol used for KafkaConsumersession.timeout.ms
: this is deprecated in KIP-848 and is not supported for share groupsheartbeat.interval.ms
: this is deprecated in KIP-848 and is not supported for share groups
Kafka protocol changes
This KIP introduces the following new APIs:
...
FindCoordinator
- for finding coordinators, to support share coordinatorsListGroups
- for listing groups, to support listing share groups
...
Access control
This KIP adds the following error codes the Kafka protocol.table gives the ACLs required for the new APIs.
RPC | Operation | Resource |
---|---|---|
ShareGroupHeartbeat | Read | Group |
ShareGroupDescribe | Describe | Group |
ShareFetch | Read Read | Group Topic |
ShareAcknowledge | Read Read | Group Topic |
AlterShareGroupOffsets | Read | Group |
DeleteShareGroupOffsets | Read | Group |
DescribeShareGroupOffsets | Describe | Group |
InitializeShareGroupState | ClusterAction | Cluster |
ReadShareGroupState | ClusterAction | Cluster |
WriteShareGroupState | ClusterAction | Cluster |
DeleteShareGroupState | ClusterAction | Cluster |
ReadShareGroupStateOffsets | ClusterAction | Cluster |
Error codes
This KIP adds the following error codes the Kafka protocol.
INVALID_RECORD_STATE
- The record state is invalid. The acknowledgement of delivery could not be completed.SHARE_SESSION_NOT_FOUND
- The share session is not found.INVALID_SHARE_SESSION_EPOCH
- The share session epoch is invalid.FENCED_STATE_EPOCH
- The shareINVALID_RECORD_STATE
- The record state is invalid. The acknowledgement of delivery could not be completed.SHARE_SESSION_NOT_FOUND
- The share session is not found.INVALID_SHARE_SESSION_EPOCH
- The share session epoch is invalid.FENCED_STATE_EPOCH
- The share coordinator rejected the request because share-group state epoch did not match.
...
For the AcknowledgementBatches
of each topic-partition, the BaseOffsets
FirstOffsets
must be ascending order and the ranges must be non-overlapping.
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+", "about": "BaseFirst offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false, "about": "The partitions to remove from this share session.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ShareFetchResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors for ErrorCode and AcknowledgeErrorCode: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode // - KAFKA_STORAGE_ERROR (version 0+) // - CORRUPT_MESSAGE (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top-level response error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The fetch error code, or 0 if there was no fetch error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The fetch error message, or null if there was no fetch error." }, { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+", "about": "The acknowledge error code, or 0 if there was no acknowledge error." }, { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The acknowledge error message, or null if there was no acknowledge error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [ {"name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."}, {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."}, {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."} ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
...
For the AcknowledgementBatches
of each topic-partition, the BaseOffsets
FirstOffsets
must be ascending order and the ranges must be non-overlapping.
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+", "about": "The topics containing records to acknowledge.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+", "about": "The partitions containing records to acknowledge.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+", "about": "BaseFirst offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]} ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadStateResult", "versions": "0+", "about": "The read results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about" : "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, which can be -1 if it is not yet initialized." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+", "about": "The basefirst offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archived." }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ]} ]} ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about":"The group identifier." }, { "name": "Topics", "type": "[]WriteStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being written." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+", "about": "The basefirst offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ]} ]} ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareSnapshotValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "SnapshotEpoch", "type": "uint16", "versions": "0", "about": "The snapshot epoch." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0", "about": "The basefirst offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0", "about": "The delivery state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0", "about": "The delivery count." } ]} ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareUpdateValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "SnapshotEpoch", "type": "uint16", "versions": "0", "about": "The snapshot epoch." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset, or -1 if the start offset is not being updated." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0", "about": "The basefirst offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0", "about": "The delivery state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0", "about": "The delivery count." } ]} ] } |
...
Compatibility, Deprecation, and Migration Plan
Kafka Broker Migration
As is customary for large KIPs, this KIP will be delivered into Apache Kafka progressively, starting with an Early Access release, and then moving through Preview, and finally General Availability.
Early access and Preview
At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.
A temporary configuration group.share.enable
is used to turn on the feature. There is no support for upgrade or downgrade.
General availability
This KIP builds upon KIP-848 and the new group coordinator.
To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled by setting a group.version
which supports it using the kafka-feature.sh
tool. Finally, the group.coordinator.rebalance.protocols
configuration is changed to add "share"
to the list of enabled rebalance protocols.
This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets
topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.
Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP adds a new version for the ConsumerGroupMetadataValue
record to include the group type. If the software version does not understand the v1 record type, it will assume the records apply to a consumer group of the same name. We should make sure this is a harmless situation.More information need to be added here based on the share-partition persistence mechanism. Details are still under consideration hererecords to the __consumer_offsets
topic which will not be understood by group coordinator. The group coordinator will ignore these records. The __share_group_state
topic will be unused and can be manually deleted.
Test Plan
The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.
...