Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: More detail on share coordinator

...

If a regular Kafka consumer then attempts to use "G1" as a consumer group, the exception "InconsistentGroupProtocolException" will be thrown.

Share group membership

...

For a share group, the group coordinator persists two kinds of records:

  • ConsumerGroupMetadata: - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupPartitionMetadata: - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a group coordinator failover.

In-flight records

...

The sessions are handled as follows:

RPC request

Request ShareSessionEpoch

Meaning

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

0

This is a full fetch request. It contains the complete set of topic-partitions to fetch. It cannot contains any acknowledgements.

If the request contains acknowledgements, fails with error code INVALID_REQUEST .

If the session is in the cache, discard the existing session releasing all acquired records.

Create a new session in the cache with epoch 1.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

This is an incremental fetch request. It contains a partial set of topic-partitions to be applied to the set already in the cache. It can contain a set of acknowledgements to perform before returning the fetched data.

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, update the set of topic-partitions in the cache, increment the epoch in the cache, process the acknowledgements, and fetch records from the replica manager.

ShareFetch (GroupId, MemberId, ShareSessionEpoch)

-1

This is a final fetch request. It can contain a final set of acknowledgements, but its primary purpose is to close the share session.

If the request contains a list of topics to add or forget, fails with error code INVALID_REQUEST .

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and remove the share session from the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

0

Fails with error code INVALID_SHARE_SESSION_EPOCH . It’s not permitted to create a share session with this request.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

1 to Integer.MAX_VALUE inclusive

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache and the request epoch is incorrect, fails with error code INVALID_SHARE_SESSION_EPOCH .

Otherwise, process the acknowledgements and increment the epoch in the cache.

ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch)

-1

If the session is not in the cache, fails with error code SHARE_SESSION_NOT_FOUND .

If the session is in the cache, process the acknowledgements and remove the share session from the cache.

Client programming interface

...

Each call to KafkaShareConsumer.poll(Duration) fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement. KafkaShareConsumer  works out which style of acknowledgement is being used by the order of calls the application makes. It is not permissible to mix the two styles of acknowledgement.

If the application calls the KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) method for any record in the batch, it is using explicit acknowledgement. The calls to KafkaShareConsumer.acknowledge(ConsumerRecord, AcknowledgeType) must be issued in the order in which the records appear in the ConsumerRecords object, which will be in order of increasing offset for each share-partition. In this case:

  • The application calls KafkaShareConsumer.commitSync/Async() which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.poll(Duration) without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.

  • The application calls KafkaShareConsumer.close() which attempts to commit any pending acknowledgements and releases any remaining acquired records.

...

  • The application calls KafkaShareConsumer.commitSync/Async() which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.

  • The application calls KafkaShareConsumer.poll(Duration) without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.

  • The application calls KafkaShareConsumer.close() which releases any acquired records without acknowledgement.

The KafkaShareConsumer guarantees that the records returned in the ConsumerRecords object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException, but the record may well be re-acquired for the same consumer and returned to it again.

...

The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState , ReadShareGroupState , WriteShareGroupState , and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type and the key of "group:topicId:partition" .

When a share coordinator becomes leader of a partition of the share-group state topic, it scans the partition to replay the records it finds and build its in-memory state. Once complete, it is ready to serve the share-group state RPCs.

The records on the share-group state topic are keyed by (group, topicId, partition). This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.

...

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareCheckpoint


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
CheckpointEpoch: uint32
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


ShareDelta


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32
DeltaIndex: uint8 (not included when choosing partition)

The keys for delta records with different delta indices need to be different so that log compaction retains multiple records.


Code Block
StateEpoch: uint32
CheckpointEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareCheckpoint and any subsequent ShareDelta records whose checkpoint epoch matches. It keeps track of the maximum delta index for each share-partition so that it can continue the sequence. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

...

Here are some examples showing the writing of share-group state.

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),       "DeliveryCount": 1
    }
  ]
}


Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),       "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),       "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),       "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),       "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),       "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


Administration

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

...

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinatorThis occurs as a side-effect of a ShareGroupHeartbeat . The group coordinator writes a record to the consumer offsets topic to persist the group's existence.
Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state  topic.
List share groupsGroup coordinator
List share group offsetsGroup coordinator and share-partition leadersThe admin client gets a list of share-partitions from the group coordinator, and then asks the share-partition leaders for the offset information.
Describe share groupGroup coordinator
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ConsumerGroupMetadata record to the __consumer_offsets  topic.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

...

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.

They are all subclasses of RetriableException .

Command-line tools

kafka-share-groups.sh

...

  • INVALID_RECORD_STATE  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  - The share coordinator rejected the request because share-group state epoch did not match.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
   "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields":[
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
   ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

...

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions loaded in the last 30 seconds.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics


The time taken for a share-group state write call.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...