...
The share coordinator is responsible for persistence of share-group state on a new internal topic. The share-partition leader uses the share-group state persister to communicate with the share coordinator.
The following diagram illustrates how these pieces are wired together.
Relationship with consumer groups
...
Share Group | ||
---|---|---|
Name | Type | Description |
Group ID | string | The group ID as configured by the consumer. The ID uniquely identifies the group. |
Group Epoch | int32 | The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group. |
Server AssignoreAssignor | string | The server-side assignor used by the group. |
Members | []Member | The set of members in the group. |
Partitions Metadata | []PartitionMetadata | The metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes. |
Member | ||
Name | Type | Description |
Member ID | string | The unique identifier of the member. It is generated by the coordinator upon the first heartbeat request and must be used throughout the lifetime of the member. |
Rack ID | string | The rack ID configured by the consumer. |
Client ID | string | The client ID configured by the consumer. |
Client Host | string | The client host of the consumer. |
Subscribed Topic Names | []string | The current set of subscribed topic names configured by the consumer. |
...
In all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. For a share group, the group coordinator does not persist the group metadata. The new version of the group metadata signals that a new assignment is required for the group.
...
- EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
- STABLE - When a share group has active members, the share group is STABLE.
- DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it. This only happens if the group does not have any persistent share-group state. Share groups are intentionally more durable than consumer groups.
Persistence and fail-over
For a share group, the group coordinator only persists a single record which essentially reserves persists two kinds of records:
- ConsumerGroupMetadata: this is written to reserve the group's ID as a share group in the namespace of groups.
- ShareGroupPartitionMetadata: this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.
When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets
partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a coordinator failover.
...
Reading transactional records
Each consumer in a consumer group has its own isolation level which controls how it handles records which were produced in transactions. In a consumer group, a consumer using read_committed
isolation level is only able to fetch records up to the last stable offset (LSO). It is also responsible for filtering out transactional records which were aborted. This filtering happens in the client.
For a share group, the concept of isolation level applies to the entire group, not each consumer. The isolation level of a share group is controlled by the group configuration group.share.isolation.level
.
For the read_uncommitted
isolation level, which is the default, the share group consumes all non-transactional and transactional records.
For the read_committed
isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPSO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed
isolation level. The share-partition leader itself is responsible for keeping track of the commit and abort markers, and filtering out transactional records which have been aborted.
Let’s have a look at the structure of the log for transactional records and see how the share-partition leader processes the log to build up the in-flight records in the presence of transactions.
A log segments consists of a sequence of RecordBatch
structures. They are interpreted as follows:
isTransactional=false
is a batch of non-transactional records - these are potentially in-flight recordsisTransactional=true, isControl=false
is a batch of transactional records - these are potentially in-flight records if the transaction commitsisTransactional=true, isControl=true
is a batch of control records - within these batches are theEndTxnMarker
records which indicates the commit or abort of a transaction
So, only the last of these needs to be deeply inspected to read the records, but these batches are not compressed and they’re also very small.
The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED
. This means the the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed and as such have a transaction marker in the log before the LSO.
When a RecordBatch
of non-transactional records is fetched, the records are immediately added to the set of in-flight records in Available state. When a RecordBatch
of transactional records is fetched, the records are added to the set of in-flight records in a transient Uncommitted state, tagged with the producer ID and producer epoch which act as the transaction identifier. When a RecordBatch
of transactional control records is fetched, the EndTxnMarker
within is inspected to see whether it is committing or aborting the transaction. If the transaction is committed, the Uncommitted records for that transaction become Available. If the transaction is aborted, the Uncommitted records for that transaction are discarded.
If a transaction was long-running, it may be necessary to read a lot of records in order to discover the control record. If the number of in-flight records has reached its limit and there are any Uncommitted records, the log is scanned looking for transaction control records beyond the SPEO, building up a summary of the (producer ID, producer epoch, outcome)
information that is found. This can then subsequently be used to process transactional records as they are being added to the in-flight records.
Share sessions
The ShareFetch
API works very much like incremental fetch using a concept called a share session. Each share session contains a set of topic-partitions which are managed in the share-partition leaders. The share-partition leader manages the fetching of records and the in-flight record state for its share-partitions. The consumer adds and removes topic-partitions from its share session using the ShareFetch
API just like the Fetch
API is used for incremental fetch. With the Fetch
API, the consumer specifies the fetch offset. With the ShareFetch
API, the consumer just fetches records and the share-partition leader decides which records to return.
...
The share-partition leader uses and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState
, ReadShareGroupState
, WriteShareGroupState
, and DeleteShareGroupState
. The share-partition leader uses the FindCoordinator
RPC to find its share coordinator, with a new key_type and the key of "group:topicId:partition"
.
...
The share coordinator will prefer to write a checkpoint over a delta (for example, when the SPSO moves and there are no in-flight records, the checkpoint will be small and there’s no need to write a delta instead). The share coordinator will take a checkpoint periodically, frequently enough to minimise the number of ShareDelta records to replay but rarely enough to minimise the performance cost of taking checkpoints.
The records also include a state epoch. This is used to ensure that all of the components involved as aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.
The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):
Type | Key | Value | ||||
---|---|---|---|---|---|---|
ShareCheckpoint |
|
| ||||
ShareDelta |
The keys for delta records with different delta indices need to be different so that log compaction retains multiple records. |
|
...
Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The following table summarises share coordinator is responsible for share-group persistent state.
The following table explains how the administration operations and how they on share groups work.
Operation |
---|
Involves | Notes | |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of a ShareGroupHeartbeat . The group coordinator writes a record to the consumer offsets topic to persist the group's existence. |
Assign a share |
-partition | Group coordinator and optionally share coordinator |
When a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state topic. | |
List share groups | Group coordinator |
List share group offsets |
Group coordinator and share-partition leaders | The |
admin client gets a list of share-partitions from the group coordinator, and then asks the share-partition |
leaders for the offset information. |
Describe share group |
Group coordinator | ||
Alter share group offsets | Group coordinator and share coordinator |
Only empty share groups |
support this operation. The group coordinator sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state topic. | ||
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets topic to complete the deletion of the offsets. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the |
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
...
ShareGroupHeartbeat
- for consumers to form and maintain share groupsShareGroupDescribe
- for describing share groupsShareFetch
- for fetching records from share-partition leadersShareAcknowledge
- for acknowledging delivery of records with share-partition leadersAlterShareGroupOffsets
- for altering the share-partition start offsets for the share-partitions in a share groupDeleteShareGroupOffsets
- for deleting the offsets for the share-partitions in a share groupDescribeShareGroupOffsets
- for describing the offsets for the share-partitions in a share groupReadShareGroupState
InitializeShareGroupState
- for reading initializing share-partition state from on a share-coordinatorWriteShareGroupState
ReadShareGroupState
- for reading share-partition state from a share coordinatorWriteShareGroupState
- for writing share-partition state to a share coordinatorDeleteShareGroupState
- for deleting share-partition state from a share coordinator
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DescribeShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset."}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
InitializeShareGroupStateAPI
The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{
"apiKey": NN,
"type": "request",
"listeners": ["broker"],
"name": "InitializeShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0",
"about": "The group identifier." },
{ "name": "TopicId", "type": "uuid", "versions": "0",
"about": "The topic identifier." },
{ "name": "Partition", "type": "int32", "versions": "0",
"about": "The partition index." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0",
"about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
]}
]
} |
Response schema
Code Block |
---|
{
"apiKey": NN,
"type": "response",
"name": "InitializeShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]
} |
ReadShareGroupState API
The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0", "about":"The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0", "about": "The topic identifier." }, { "name": "ThrottleTimeMsPartition", "type": "int32", "versions": "0+", "ignorable": true, "about": "The durationpartition index." } ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ResponsesErrorCode", "type": "[]DescribeShareGroupOffsetsResponseTopicint16", "versions": "0+", "about": "The results for each topic.", "fields": [ error code, or 0 if there was no error." }, { "name": "TopicNameStateEpoch", "type": "stringint32", "versions": "0+", "entityType": "topicName", "about": "The topic name state epoch for this share-partition." }, { "name": "TopicIdStartOffset", "type": "uuidint64", "versions": "0+", "ignorable": true, "about": "The uniqueshare-partition topicstart IDoffset." }, { "name": "PartitionsStateBatches", "type": "[]DescribeShareGroupOffsetsResponsePartitionStateBatch", "versions": "0+", "fields": [ { "name": "PartitionIndexBaseOffset", "type": "int32int64", "versions": "0+", "about": "The partition index": "The base offset of this state batch." }, { "name": "StartOffsetLastOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." last offset of this state batch." }, { "name": "ErrorCodeState", "type": "int16int8", "versions": "0+", "about": "The errorstate code,- or 0 if there was no error0:Available,2:Acked,4:Archived." }, { "name": "ErrorMessageDeliveryCount", "type": "stringint16", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} delivery count." } ]} ] } |
...
WriteShareGroupState API
The ReadShareGroupState WriteShareGroupState API is used by share-partition leaders to read write share-partition state from to a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateRequestWriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0", "about": "The group identifier." }, { "name": "TopicId", "type": "uuid", "versions": "0", "about": "The topic identifier." }, { "name": "Partition", "type": "int32", "versions": "0", "about": "The partition index." } ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [." }, { "name": "ErrorCodeStateEpoch", "type": "int16int32", "versions": "0+", "about": "The errorstate code,epoch orfor 0 if there was no errorthis share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0", "about": "The share-partition start offset, or -1 if the start offset is not being written." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ { "name": "BaseOffset", "type": "int64", "versions": "0", "about": "The base offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0", "about": "The last offset of this state batch." }, { "name": "State", "type": "int8", "versions": "0", "about": "The state - 0:Available,2:Acked,4:Archived." }, { "name": "DeliveryCount", "type": "int16", "versions": "0", "about": "The delivery count." } ]} ] } |
WriteShareGroupState API
The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.
...
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0", "about": "The group identifier." }, { "name, "type": "TopicIdresponse", "typename": "uuidWriteShareGroupStateResponse", "versionsvalidVersions": "0", "aboutflexibleVersions": "Thenone", topic identifier."fields": },[ { "name": "PartitionErrorCode", "type": "int32int16", "versions": "0+", "about": "The partition indexerror code, or 0 if there was no error." }, ] } |
DeleteShareGroupState API
The DeleteShareGroupState API is used by share-partition leaders to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, { "name": "StartOffset", "type": "int64request", "versionslisteners": ["0broker"], "name": "DeleteShareGroupStateRequest", "aboutvalidVersions": "The share-partition start offset, or -1 if the start offset is not being written." },0", "flexibleVersions": "0+", "fields": [ { "name": "StateBatchesGroupId", "type": "[]StateBatchstring", "versions": "0", "fieldsabout": "The [ group identifier." }, { "name": "BaseOffsetTopicId", "type": "int64uuid", "versions": "0", "about": "The base offset of this state batchtopic identifier." }, { "name": "LastOffsetPartition", "type": "int64int32", "versions": "0", "about": "The last offset of this state batchpartition index." }, ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, { "nametype": "Stateresponse", "typename": "int8DeleteShareGroupStateResponse", "versionsvalidVersions": "0", "about"flexibleVersions": "none"The, state - 0:Available,2:Acked,4:Archived" }, "fields": [ { "name": "DeliveryCountErrorCode", "type": "int16", "versions": "0+", "about": "The delivery count." } ] error code, or 0 if there was no error." } ] } |
...
Records
This section describes the new record types.
Group metadata
In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets
topic.
For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.
There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.
ConsumerGroupMetadataKey
This is included for completeness. There is no change to this record.
Code Block |
---|
{ "apiKey": NN, "type": "responsedata", "name": "WriteShareGroupStateResponseConsumerGroupMetadataKey", "validVersions": "03", "flexibleVersions": "none", "fields": [ { "name": "ErrorCodeGroupId", "type": "int16string", "versions": "0+3", "about": "The error code, or 0 if there was no errorgroup id." } ] } |
DeleteShareGroupState API
The DeleteShareGroupState API is used by share-partition leaders to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
...
ConsumerGroupMetadataValue
A new version of the record value is introduced contains the Type
field. For a share group, the type will be "share"
. For a consumer group, the type can be omitted (null) or "consumer"
.
Code Block |
---|
{ "apiKeytype": NN"data", "typename": "requestConsumerGroupMetadataValue", "listenersvalidVersions": ["broker0-1"], "nameflexibleVersions": "DeleteShareGroupStateRequest0+", "validVersionsfields": [ { "name": "0Epoch", "type": "int32", "flexibleVersionsversions": "0+", "fieldsabout": [ "The group epoch." }, // Version 1 adds Type field { "name": "GroupIdType", "type": "string", "versions": "1+", "nullableVersions": "01+", "about": "The group identifier type - null indicates consumer group." }, ] } |
ShareGroupPartitionMetadataKey
Code Block |
---|
{ {"nametype": "TopicIddata", "typename": "uuidShareGroupPartitionMetadataKey", "versionsvalidVersions": "09", "aboutflexibleVersions": "Thenone", topic identifier."fields": },[ { "name": "PartitionGroupId", "type": "int32string", "versions": "03", "about": "The partitiongroup indexid." } ] } |
...
ShareGroupPartitionMetadataValue
Code Block |
---|
{ "apiKey": NN, "type": "responsedata", "name": "DeleteShareGroupStateResponseShareGroupPartitionMetadataValue", "validVersions": "09", "flexibleVersions": "none", "fields": [ { "name": "ErrorCodeEpoch", "type": "int16", "versions": "0+", "abouttype": "The error code, or 0 if there was no error." } ] } |
Records
This section describes the new record types.
Group metadata
In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets
topic.
For each share group, a single ConsumerGroupMetadata
record is written. When the group is deleted, a tombstone record is written.
ConsumerGroupMetadataKey
This is included for completeness. There is no change to this record.
Code Block |
---|
{ int32" }, { "name": "Topics", "versions": "0+", "type": "data[]TopicMetadata", "fields": [ { "name": "ConsumerGroupMetadataKeyTopicId", "versions": "0+", "validVersionstype": "3uuid" }, { "flexibleVersionsname": "noneNumPartitions", "versions": "0+", "fieldstype": [ "int32" } ]}, { "name": "GroupIdInitializingTopics", "typeversions": "string0+", "versionstype": "3[]TopicMetadata", "fields": [ { "name": "TopicId", "aboutversions": "The group id.0+", "type": "uuid" } ] } |
ConsumerGroupMetadataValue
A new version of the record value is introduced contains the Type
field. For a share group, the type will be "share"
. For a consumer group, the type can be omitted (null) or "consumer"
.
Code Block |
---|
{ , { "name": "StartPartitionIndex", "versions": "0+", "type": "dataint32" }, { "name": "ConsumerGroupMetadataValueEndPartitionIndex", "validVersionsversions": "0-1+", "flexibleVersionstype": "0+int32", } "fields": [ ]}, { "name": "EpochDeletingTopics", "typeversions": "int320+", "versionstype": "0+[]TopicMetadata", "aboutfields": "The group epoch." },[ // Version 1 adds Type field { "name": "Type", "type": "string"TopicId", "versions": "10+", "nullableVersionstype": "1+uuid" }, { "aboutname": "NumPartitions", "versions"The group type - null indicates consumer group." } ] }: "0+", "type": "int32" } ]} ] } |
The InitializingTopics
field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When the share coordinator successfully responds to InitializeShareGroupState
, the topic-partitions are moved into the Topics
field.
In a similar way, the DeletingTopics
field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.
Share-group state
These records are written by the share coordinator on the __share_group_state
topic.
...