Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Review comments about configs and record schemas

...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. This is an internal configuration and will be removed once this feature is complete.
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)"share"  will be added to the default value of this configuration once this feature is complete.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.max.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the initial delivery, only the first one in the list is usedfuture, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
group.share.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
group.share.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
group.share.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
group.share.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

...

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

nullIf not specified, which uses the cluster broker configuration group.share.record.lock.duration.ms.

If specified, minimum 1000, maximum limited by the cluster broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.heartbeat.interval.ms .

If specified, minimum limited by the broker configuration group.share.min.heartbeat.interval.ms , maximum limited by the broker configuration group.share.max.heartbeat.interval.ms

group.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

...

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

...

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataKeyShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "3",
      "about": "The group id." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "EpochInitializedTopics", "versions": "0+", "type": "int32[]TopicMetadata" },
    { "name": "InitializedTopicsInitializingTopics", "versions": "0+", "type": "[]TopicMetadataInitializingTopicMetadata" },
    { "name": "InitializingTopicsDeletingTopics", "versions": "0+", "type": "[]TopicIndexMetadataTopicMetadata" }
  ],
  "commonStructs": [
    { "name": "DeletingTopicsTopicMetadata", "versions": "0+", "typefields": "[]TopicMetadata" }
  ],
  "commonStructs": [
    { "name[
      { "name": "TopicId", "type": "TopicMetadatauuid", "versions": "0+",
        "fieldsabout": ["The topic identifier." },
      { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "0+",
        "about": "The topic identifiername." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]},
    { "name": "TopicIndexMetadataInitializingTopicMetadata", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier". },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "uuidThe topic name." },
      { "name": "StartPartitionIndex", "type": "int32", "versions": "0+",
        "typeabout": "int32The start partition index of the partitions being initialized." },
      { "name": "EndPartitionIndex", "type": "int32", "versions": "0+",
       "typeabout": "int32The end partition index of the partitions being initialized." }
    ]}
  ]
}

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When partitions are being initialized, StartPartitionIndex  is the index of the first partition being initialized and EndPartitionIndex  is the index of the last partition being initialized, such as when the number of partitions for a topic is increased. Once the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the InitializedTopics  field. A topic can of course be in both the InitializedTopics  and the InitializingTopics  field when its number of partitions has been increased.

In a similar way, the DeletingTopics  field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...