...
Operation | Involves | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of the initial a) The group coordinator serves the b) For each share-partition being initialized, the group coordinator sends an c) The share coordinator serves the d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the |
Assign a share-partition | Group coordinator and optionally share coordinator | When a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group. |
List share groups | Group coordinator | |
Describe share group | Group coordinator | |
List share group offsets | Group coordinator and share coordinator | The admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required. |
Alter share group offsets | Group coordinator and share coordinator | Only empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state topic. |
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets topic to complete the deletion of the offsets. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the |
...
Configuration | Description | Values |
---|---|---|
group.share.enable | Whether to enable share groups on the broker. | Default false while the feature is being developed. This is an internal configuration and will be removed once this feature is complete. |
group.coordinator.rebalance.protocols | The list of enabled rebalance protocols. (Existing configuration) | "share" will be added to the default value of this configuration once this feature is complete. |
group.share.delivery.count.limit | The maximum number of delivery attempts for a record delivered to a share group. | Default 5, minimum 2, maximum 10 |
group.share.record.lock.duration.ms | Share-group record acquisition lock duration in milliseconds. | Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds) |
group.share.max.record.lock.duration.max.ms | Share-group record acquisition lock maximum duration in milliseconds. | Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour) |
group.share.record.lock.partition.limit | Share-group record lock limit per share-partition. | Default 200, minimum 100, maximum 10000 |
group.share.session.timeout.ms | The timeout to detect client failures when using the group protocol. | Default 45000 (45 seconds) |
group.share.min.session.timeout.ms | The minimum session timeout. | Default 45000 (45 seconds) |
group.share.max.session.timeout.ms | The maximum session timeout. | Default 60000 (60 seconds) |
group.share.heartbeat.interval.ms | The heartbeat interval given to the members. | Default 5000 (5 seconds) |
group.share.min.heartbeat.interval.ms | The minimum heartbeat interval. | Default 5000 (5 seconds) |
group.share.max.heartbeat.interval.ms | The maximum heartbeat interval. | Default 15000 (15 seconds) |
group.share.max.groups | The maximum number of share groups. | Default 10, minimum 1, maximum 100 |
group.share.max.size | The maximum number of consumers that a single share group can accommodate. | Default 200, minimum 10, maximum 1000 |
group.share.assignors | The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the initial delivery, only the first one in the list is usedfuture, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors. | A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor" |
group.share.state.topic.num.partitions | The number of partitions for the share-group state topic (should not change after deployment). | Default 50 |
group.share.state.topic.replication.factor | The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement. | Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
group.share.state.topic.segment.bytes | The log segment size for the share-group state topic. | Default 104857600 |
group.share.state.topic.min.isr | Overridden min.insync.replicas for the share-group state topic. | Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.threads | The number of threads used by the share coordinator. | Default 1, minimum 1 |
...
Configuration | Description | Values |
---|---|---|
group.share.isolation.level | Controls how to read records written transactionally. If set to | Valid values |
group.share.auto.offset.reset | What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:
| Valid values |
group.share.record.lock.duration.ms | Record acquisition lock duration in milliseconds. | nullIf not specified, which uses the cluster broker configuration If specified, minimum 1000, maximum limited by the cluster broker configuration |
group.share.heartbeat.interval.ms | The heartbeat interval given to the members. | If not specified, uses the broker configuration If specified, minimum limited by the broker configuration |
group.share.session.timeout.ms | The timeout to detect client failures when using the share group protocol. | If not specified, uses the broker configuration If specified, minimum limited by the broker configuration |
Consumer configuration
The existing consumer configurations apply for share groups with the following exceptions:
...
For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.
There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.
...
Code Block |
---|
{ "type": "data", "name": "ConsumerGroupMetadataKeyShareGroupMetadataKey", "validVersions": "11", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "3", "about": "The group id." } ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "EpochInitializedTopics", "versions": "0+", "type": "int32[]TopicMetadata" }, { "name": "InitializedTopicsInitializingTopics", "versions": "0+", "type": "[]TopicMetadataInitializingTopicMetadata" }, { "name": "InitializingTopicsDeletingTopics", "versions": "0+", "type": "[]TopicIndexMetadataTopicMetadata" } ], "commonStructs": [ { "name": "DeletingTopicsTopicMetadata", "versions": "0+", "typefields": "[]TopicMetadata" } ], "commonStructs": [ { "name[ { "name": "TopicId", "type": "TopicMetadatauuid", "versions": "0+", "fieldsabout": ["The topic identifier." }, { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "0+", "about": "The topic identifiername." }, { "name": "NumPartitions", "type": "int32", "versions": "0+", "about": "The number of partitions." } ]}, { "name": "TopicIndexMetadataInitializingTopicMetadata", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier". }, { "name": "TopicName", "type": "string", "versions": "0+", "about": "uuidThe topic name." }, { "name": "StartPartitionIndex", "type": "int32", "versions": "0+", "typeabout": "int32The start partition index of the partitions being initialized." }, { "name": "EndPartitionIndex", "type": "int32", "versions": "0+", "typeabout": "int32The end partition index of the partitions being initialized." } ]} ] } |
The InitializingTopics
field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When partitions are being initialized, StartPartitionIndex
is the index of the first partition being initialized and EndPartitionIndex
is the index of the last partition being initialized, such as when the number of partitions for a topic is increased. Once the share coordinator successfully responds to InitializeShareGroupState
, the topic-partitions are moved into the InitializedTopics
field. A topic can of course be in both the InitializedTopics
and the InitializingTopics
field when its number of partitions has been increased.
In a similar way, the DeletingTopics
field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.
...
Code Block |
---|
{ "type": "data", "name": "ShareSnapshotValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "SnapshotEpoch", "type": "uint16", "versions": "0", "about": "The snapshot epoch." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch for this share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0", "about": "The first offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0", "about": "The delivery state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0", "about": "The delivery count." } ]} ] } |
...