Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added share.fetch.purgatory.purge.interval.requests

...

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

For a share group, the group coordinator persists three seven kinds of records:

  • ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.

...

The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

...

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.

...

  • If the topic still exists, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.

...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

ConfigurationDescriptionValues
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
share.coordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
share.coordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
share.coordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 
ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

If not specified, uses the broker configuration group.share.record.lock.duration.ms.

If specified, minimum limited by the broker configuration group.share.min.record.lock.duration.ms , maximum limited by the broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.heartbeat.interval.ms .

If specified, minimum limited by the broker configuration group.share.min.heartbeat.interval.ms , maximum limited by the broker configuration group.share.max.heartbeat.interval.ms

group.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .

...

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. For each share group, a single ShareGroupMetadata record is written. When the group is deleted, a tombstone record is records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

...

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." }
  ]
}

...

ShareGroupPartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKeyShareGroupPartitionMetadataKey",
  "validVersions": "109",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "9",
      "about": "The group id." }
  ]
}

ShareGroupPartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Topics", "versions": "0+", "type": "10[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid",
        "about": "The topic id." },
      { "name": "TopicName", "versions": "0+", "type": "string",
        "about": "The topic name." },
      { "name": "NumPartitions", "versions": "0+", "type": "int32",
        "about": "The number of partitions of the topic." },
      { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
        "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
        { "name": "Partition", "versions": "0+", "type": "int32",
          "about": "The partition number." },
        { "name": "Racks", "versions": "0+", "type": "[]string",
          "about": "The set of racks that the partition is mapped to." }
      ]}
    ]}
  ]
}

ShareGroupMemberMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKey",
  "validVersions": "10",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "10",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "10",
      "about": "The member id." }
  ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) rack id." },
    { "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ClientHost", "versions": "0+", "type": "string",
      "about": "The client host." },
    { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string",
      "about": "The list of subscribed topic names." }
  ]
}

ShareGroupTargetAssignmentMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataKey",
  "validVersions": "12",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "12",
      "about": "The group id." }
  ]
}

ShareGroupTargerAssignmentMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
      "about": "The assignment epoch." }
  ]
}

ShareGroupTargetAssignmentMemberKey

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMemberKey",
  "validVersions": "13",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "13",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "13",
      "about": "The member id." }
  ]
}

ShareGroupTargetAssignmentMemberValue

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMemberValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition",
      "about": "The assigned partitions.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid" },
      { "name": "Partitions", "versions": "0+", "type": "[]int32" }
    ]}
  ]
}

ShareGroupCurrentMemberAssignmentKey

Code Block
{
  "type": "data",
  "name": "ShareGroupCurrentMemberAssignmentKey",
  "validVersions": "14",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "14",
      "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "14",
      "about": "The member id." }
  ]
}

ShareGroupCurrentMemberAssignmentValue

Code Block
{
  "type": "data",
  "name": "ShareGroupCurrentMemberAssignmentValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "MemberEpoch", "versions": "0+", "type": "int32",
      "about": "The current member epoch that is expected from the member in the heartbeat requestThe group id." },
    { "name": "MemberIdPreviousMemberEpoch", "typeversions": "string0+", "versionstype": "10int32",
      "about": "The member idIf the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "typename": "dataState",
  "nameversions": "ShareGroupMemberMetadataValue0+",
  "validVersionstype": "0int8",
      "flexibleVersionsabout": "0+",
  "fields": [The member state. See MemberState for the possible values." },
    { "name": "RackIdAssignedPartitions", "versions": "0+", "nullableVersions": "0+", "type": "string[]TopicPartitions",
      "about": "The (optionalpartitions assigned to (or owned by) rackthis idmember." }
  ],
  "commonStructs": [
    { "name": "ClientIdTopicPartitions", "versions": "0+", "typefields": "string",[
      { "aboutname": "The client id." },
    { "nameTopicId", "type": "ClientHostuuid", "versions": "0+",
 "type": "string",
      "about": "The clienttopic hostId." },
      { "name": "SubscribedTopicNamesPartitions", "versionstype": "0+[]int32", "typeversions": "[]string0+",
        "about": "The partition list of subscribed topic names." Ids." }
    ]}
  ]
}

...

ShareGroupStatePartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKeyShareGroupStatePartitionMetadataKey",
  "validVersions": "915",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "915",
      "about": "The group id." }
  ]
}

...

ShareGroupStatePartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "01",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "01",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "01",
      "about": "The partition index." }
  ]
}

...