...
If a regular Kafka consumer then attempts to use "G1"
as a consumer group, the exception "InconsistentGroupProtocolException"
will be thrown.
Share group membership
...
For a share group, the group coordinator persists two kinds of records:
- ConsumerGroupMetadata: - this is written to reserve the group's ID as a share group in the namespace of groups.
- ShareGroupPartitionMetadata: - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.
When the group coordinator fails over, the newly elected coordinator loads the state from the __consumer_offsets
partition. This means a share group will remain in existence across the fail-over. However, the members of the groups and their assignments are not persisted. This means that existing members will have to rejoin the share group following a group coordinator failover.
In-flight records
...
The sessions are handled as follows:
RPC request | Request ShareSessionEpoch | Meaning |
---|---|---|
ShareFetch (GroupId, MemberId, ShareSessionEpoch) | 0 | This is a full fetch request. It contains the complete set of topic-partitions to fetch. It cannot contains any acknowledgements. If the request contains acknowledgements, fails with error code If the session is in the cache, discard the existing session releasing all acquired records. Create a new session in the cache with epoch 1. |
ShareFetch (GroupId, MemberId, ShareSessionEpoch) | 1 to Integer.MAX_VALUE inclusive | This is an incremental fetch request. It contains a partial set of topic-partitions to be applied to the set already in the cache. It can contain a set of acknowledgements to perform before returning the fetched data. If the session is not in the cache, fails with error code If the session is in the cache and the request epoch is incorrect, fails with error code Otherwise, update the set of topic-partitions in the cache, increment the epoch in the cache, process the acknowledgements, and fetch records from the replica manager. |
ShareFetch (GroupId, MemberId, ShareSessionEpoch) | -1 | This is a final fetch request. It can contain a final set of acknowledgements, but its primary purpose is to close the share session. If the request contains a list of topics to add or forget, fails with error code If the session is not in the cache, fails with error code If the session is in the cache and the request epoch is incorrect, fails with error code Otherwise, process the acknowledgements and remove the share session from the cache. |
ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch) | 0 | Fails with error code |
ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch) | 1 to Integer.MAX_VALUE inclusive | If the session is not in the cache, fails with error code If the session is in the cache and the request epoch is incorrect, fails with error code Otherwise, process the acknowledgements and increment the epoch in the cache. |
ShareAcknowledge (GroupId, MemberId, ShareSessionEpoch) | -1 | If the session is not in the cache, fails with error code If the session is in the cache, process the acknowledgements and remove the share session from the cache. |
Client programming interface
...
Each call to KafkaShareConsumer
.poll(Duration)
fetches data from any of the topic-partitions for the topics to which it subscribed. It returns a set of in-flight records acquired for this consumer for the duration of the acquisition lock timeout. For efficiency, the consumer preferentially returns complete record sets with no gaps. The application then processes the records and acknowledges their delivery, either using explicit or implicit acknowledgement. KafkaShareConsumer
works out which style of acknowledgement is being used by the order of calls the application makes. It is not permissible to mix the two styles of acknowledgement.
If the application calls the KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType)
method for any record in the batch, it is using explicit acknowledgement. The calls to KafkaShareConsumer
.acknowledge(ConsumerRecord, AcknowledgeType)
must be issued in the order in which the records appear in the ConsumerRecords
object, which will be in order of increasing offset for each share-partition. In this case:
The application calls
KafkaShareConsumer
.commitSync/Async()
which commits the acknowledgements to Kafka. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.The application calls
KafkaShareConsumer
.poll(Duration)
without committing first, which commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement. If any records in the batch were not acknowledged, they remain acquired and will be presented to the application in response to a future poll.The application calls
KafkaShareConsumer
.close()
which attempts to commit any pending acknowledgements and releases any remaining acquired records.
...
The application calls
KafkaShareConsumer
.commitSync/Async()
which implicitly acknowledges all of the delivered records as processed successfully and commits the acknowledgements to Kafka.The application calls
KafkaShareConsumer
.poll(Duration)
without committing, which also implicitly acknowledges all of the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.The application calls
KafkaShareConsumer
.close()
which releases any acquired records without acknowledgement.
The KafkaShareConsumer
guarantees that the records returned in the ConsumerRecords
object for a specific share-partition are in order of increasing offset. For each share-partition, the share-partition leader guarantees that acknowledgements for the records in a batch are performed atomically. This makes error handling significantly more straightforward because there can be one error code per share-partition.
When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutException
, but the record may well be re-acquired for the same consumer and returned to it again.
...
The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState
, ReadShareGroupState
, WriteShareGroupState
, and DeleteShareGroupState
. The share-partition leader uses the FindCoordinator
RPC to find its share coordinator, with a new key_type and the key of "group:topicId:partition"
.
When a share coordinator becomes leader of a partition of the share-group state topic, it scans the partition to replay the records it finds and build its in-memory state. Once complete, it is ready to serve the share-group state RPCs.
The records on the share-group state topic are keyed by (group, topicId, partition)
. This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.
...
The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):
Type | Key | Value | ||||
---|---|---|---|---|---|---|
ShareCheckpoint |
|
| ||||
ShareDelta |
The keys for delta records with different delta indices need to be different so that log compaction retains multiple records. |
|
When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareCheckpoint and any subsequent ShareDelta records whose checkpoint epoch matches. It keeps track of the maximum delta index for each share-partition so that it can continue the sequence. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS
error code.
...
Here are some examples showing the writing of share-group state.
Operation | State changes | Cumulative state | WriteShareGroupState request | ||
---|---|---|---|---|---|
Starting state of topic-partition with latest offset 100 | SPSO=100, SPEO=100 | SPSO=100, SPEO=100 |
| ||
In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards | |||||
Fetch records 100-109 | SPEO=110, records 100-109 (acquired, delivery count 1) | SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1) | |||
Acknowledge 100-109 | SPSO=110 | SPSO=110, SPEO=110 |
| ||
With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records | |||||
Fetch records 110-119 Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119 | SPEO=120, records 110-119 (acquired, delivery count 1) | SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1) | |||
Release 110 (consumer 1) | record 110 (available, delivery count 1) | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1) |
Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation. | ||
Acknowledge 119 (consumer 3) | record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged | SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged |
| ||
Fetch records 110, 120 (consumer 1) | SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) | |||
Lock timeout elapsed 111, 112 (consumer 1's records) | records 111-112 (available, delivery count 1) | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 113-118 (consumer 2) | records 113-118 acknowledged | SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Fetch records 111, 112 (consumer 3) | records 111-112 (acquired, delivery count 2) | SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) | |||
Acknowledge 110 (consumer 1) | SPSO=111 | SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1) |
| ||
Acknowledge 111, 112 (consumer 3) | SPSO=120 | SPSO=120, SPEO=121, record 120 (acquired, delivery count 1) |
|
Administration
Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.
...
The following table explains how the administration operations on share groups work.
Operation | Involves | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of a ShareGroupHeartbeat . The group coordinator writes a record to the consumer offsets topic to persist the group's existence. |
Assign a share-partition | Group coordinator and optionally share coordinator | When a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state topic. |
List share groups | Group coordinator | |
List share group offsets | Group coordinator and share-partition leaders | The admin client gets a list of share-partitions from the group coordinator, and then asks the share-partition leaders for the offset information. |
Describe share group | Group coordinator | |
Alter share group offsets | Group coordinator and share coordinator | Only empty share groups support this operation. The group coordinator sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state topic. |
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets topic to complete the deletion of the offsets. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the |
Public Interfaces
This KIP introduces extensive additions to the public interfaces.
...
Its definition follows the pattern of ConsumerGroupState
with fewer states.
Exceptions
The following new exceptions are added to the org.apache.kafka.common.errors
package corresponding to the new error codes in the Kafka protocol.
InvalidRecordStateException
- The record state is invalid. The acknowledgement of delivery could not be completed.ShareSessionNotFoundException
- The share session is not found.InvalidShareSessionEpochException
- The share session epoch is invalid.
They are all subclasses of RetriableException
.
Command-line tools
kafka-share-groups.sh
...
INVALID_RECORD_STATE
- The record state is invalid. The acknowledgement of delivery could not be completed.SHARE_SESSION_NOT_FOUND
- The share session is not found.INVALID_SHARE_SESSION_EPOCH
- The share session epoch is invalid.FENCED_STATE_EPOCH
- The share coordinator rejected the request because share-group state epoch did not match.
ShareGroupHeartbeat API
The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.
...
Code Block |
---|
{
"apiKey": NN,
"type": "response",
"name": "InitializeShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "none",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - FENCED_STATE_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." }
]
} |
...
Code Block |
---|
{
"apiKey": NN,
"type": "response",
"name": "ReadShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "none",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0",
"about": "The share-partition start offset." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields":[
{ "name": "BaseOffset", "type": "int64", "versions": "0",
"about": "The base offset of this state batch." },
{ "name": "LastOffset", "type": "int64", "versions": "0",
"about": "The last offset of this state batch." },
{ "name": "State", "type": "int8", "versions": "0",
"about": "The state - 0:Available,2:Acked,4:Archived." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0",
"about": "The delivery count." }
]}
]
} |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "WriteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." } ] } |
...
Metric Name | Type | Group | Tags | Description | JMX Bean |
---|---|---|---|---|---|
group-count | Gauge | group-coordinator-metrics |
| The total number of share groups managed by group coordinator. |
|
rebalance (rebalance-rate and rebalance-count) | Meter | group-coordinator-metrics |
| The total number of share group rebalances count and rate. |
|
num-partitions | Gauge | group-coordinator-metrics |
| The number of share partitions managed by group coordinator. |
|
group-count | Gauge | group-coordinator-metrics |
| The number of share groups in respective state. | kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} |
share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) | Meter | group-coordinator-metrics |
| The total number of offsets acknowledged for share groups. |
|
record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) | Meter | group-coordinator-metrics |
| The number of records acknowledged per acknowledgement type. |
|
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | group-coordinator-metrics |
| The time taken to load the share partitions. |
|
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | share-coordinator-metrics | The time taken in milliseconds to load the share-group state from the share-group state partitions loaded in the last 30 seconds. |
| |
thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg) | Meter | share-coordinator-metrics | The fraction of time the share coordinator thread is idle. |
| |
write (write-rate and write-total) | Meter | share-coordinator-metrics | The number of share-group state write calls per second. |
| |
write-latency (write-latency-avg and write-latency-total) | Meter | share-coordinator-metrics | The time taken for a share-group state write call. |
|
Future Work
There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.
...