Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove group.type, ShareGroupMetadata, assignor interface

...

  • The consumers in a share group cooperatively consume records with partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions in a topic

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

...

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. This KIP defines just one assignor, org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor, as described below.

A share-partition is a topic-partition with a subscription in a share group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

...

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group ID was first used. As a result, it is helpful to be able to ensure that a group of a particular name can only be created with a particular type. This can be achieved by defining a group configuration property group.type , using the kafka-configs.sh  tool or the AdminClient.incrementalAlterConfigs  method. For example, you could use the following command to ensure that the group ID "G1" is to be used for a share group only.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name group --entity-name G1 --alter --add-config group.type=share

If a regular Kafka consumer then attempts to use "G1" as a consumer group and the group G1  does not exist or is not a consumer group, the exception InconsistentGroupProtocolException will be thrown.

Note that the group.type  configuration is applied when a new group is created. It is best used to ensure that a group ID is reserved for use with a particular type of group. Changing the configuration when a group already exists will not have an effect until the existing group has been deleted. For production use, choosing a naming convention for groups in advance and using this configuration to enforce the group type is recommended.

...

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implement the new internal org.apache.kafka.coordinator.group.assignorshare.SharePartitionAssignor  interface. There is just one implementation of this interface so far, org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor .

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

...

For a share group, the group coordinator persists three kinds of records:

  • ConsumerGroupMetadata ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members. However, the assignments are not persisted and will be recalculated by the new group coordinator.

The

...

SimpleAssignor

This KIP introduces org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor . It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records.

...

State

Members subscribed to T1

Topic:partitions

Assignment change

Assignments

M1 subscribed to topic T1 with 1 partition

M1 (hash 1)

T1:0

Assign T1:0 to M1

M1 → T1:0

M2 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2)

T1:0

Assign T1:0 to M2

M1 → T1:0

M2 → T1:0

Add 3 partitions to T1

M1 (hash 1), M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:1 and T1:3 to M2

Revoke T1:0 from M2

Assign T1:2 to M1

M1 → T1:0, T1:2

M2 → T1:1, T1:3

M3 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3)

T1:0, T1:1, T1:2, T1:3

Assign T1:2 to M3

Revoke T1:2 from M1

M1 → T1:0

M2 → T1:1, T1:3

M3 → T1:2

M4 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4)

T1:0, T1:1, T1:2, T1:3

Assign T1:3 to M4

Revoke T1:3 from M2

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5, M6, M7, M8 join the group and subscribe to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

Assign T1:0 to M5

Assign T1:1 to M6

Assign T1:2 to M7

Assign T1:3 to M8

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

M1 subscribes to T2 with 2 partitions

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

T2:0, T2:1

Assign T2:0 and T2:1 to M1

M1 → T1:0, T2:0, T2:1

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

All members leave except M2

M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:0, T1:2, T1:3 to M2

M2 → T1:0, T1:1, T1:2, T1:3

SimpleShareAssignor SimpleAssignor does not make assignments based on rack IDs. SimpleShareAssignor SimpleAssignor does not make assignments based on lag or consumer throughput.

...

For the read_uncommitted isolation level, which is the default, the share group consumes all non-transactional and transactional records. The SPEO is bounded by the high-water mark.

For the read_committed isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPSO SPEO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level. The share-partition leader itself is responsible for filtering out transactional records which have been aborted. This can be done looking at the headers of the record batches without having to iterate of the individual records.

The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED . This means that the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed. The replica manager also returns a list of aborted transactions relevant to the records which are fetched, and the share-partition leader uses this to filter out records for transactions which aborted.

...

Share sessions use memory on the share-partition leader. Each broker that is a share-partition leader has a cache of share sessions. Because a share session is an integral part of how share groups work, as opposed to a performance optimisation in the manner of fetch sessions, the limit is calculated based on by multiplying the share group configurations group.share.max.groups  and group.share.max.size . Sessions are evicted if they have been inactive for more than 120,000 milliseconds (2 minutes). The key of the cache is the pair (GroupId, MemberId).

...

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareSnapshot


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


ShareUpdate


Code Block
Version: 1
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareSnapshot and any subsequent ShareUpdate records whose snapshot epoch matches. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ConsumerGroupMetadata ShareGroupMetadata record for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ConsumerGroupMetadata record ShareGroupMetadata record to the __consumer_offsets  topic.

...

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.

They are all subclasses of RetriableException .

Broker API

SharePartitionAssignor

The new org.apache.kafka.coordinator.group.share.SharePartitionAssignor  interface is implemented by server-side assignors for share groups.

...

Code Block
languagejava
package org.apache.kafka.coordinator.group.share;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;

/**
 * Server side partition assignor used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface SharePartitionAssignor {
    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Assigns partitions to group members based on the given assignment specification and topic metadata.
     *
     * @param assignmentSpec           The assignment spec which includes member metadata.
     * @param subscribedTopicDescriber The topic and partition metadata describer.
     * @return The new assignment for the group.
     */
    GroupAssignment assign(
        AssignmentSpec assignmentSpec,
        SubscribedTopicDescriber subscribedTopicDescriber
    ) throws PartitionAssignorException;
}

Command-line tools

kafka-share-groups.sh

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.kafka.servercoordinator.group.share.SimpleAssignor"
group.share.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
group.share.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
group.share.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
group.share.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms

group.type 

Ensures that a newly created group has the specified group type.

Valid values: "consumer"  or "share" , there is no default

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

...

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
        "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

...

For each share group, a single ConsumerGroupMetadata record ShareGroupMetadata record is written. When the group is deleted, a tombstone record is written.

...

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

This is included for completeness. There is no change to this record.

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataKey",
  "validVersions": "3",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "3",
      "about": "The group id." }
  ]
}

ConsumerGroupMetadataValue

...

the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataValueConsumerGroupMetadataKey",
  "validVersions": "0-111",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "EpochGroupId", "type": "int32string", "versions": "0+3",
      "about": "The group epochid." },
  ]
}

ShareGroupMetadataValue

Code Block
{
  // Version 1 adds Type field
    { "name"type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "Type0+",
  "fields": [
    { "typename": "int8Epoch", "versionstype": "1+int32", "defaultversions": "0+",
      "about": "The group type - 0:consumer, 1:classic, 3:shareepoch." }
  ]
}

ShareGroupMemberMetadataKey

...

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
      "about": "The (optional) rack id." },
    { "name": "ClientId", "versions": "0+", "type": "string",
      "about": "The client id." },
    { "name": "ClientHost", "versions": "0+", "type": "string",
      "about": "The client host." },
    { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string",
      "about": "The list of subscribed topic names." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "The rebalance timeout" }
  ]
}

ShareGroupPartitionMetadataKey

...

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When partitions are being initialized, StartPartitionIndex  is the index of the first partition being initialized and EndPartitionIndex  is the index of the last partition being initialized, such as when the number of partitions for a topic is increased. Once the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the InitializedTopics  field.

...