Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: New share-group state persistence using log retention

...

If a regular Kafka consumer then attempts to use "G1" as a consumer group and the group G1  does not exist or is not a consumer group, the exception InconsistentGroupProtocolException will be thrown.

Note that the group.type  configuration is applied when a new group is created. It is best used to ensure that a group ID is reserved for use with a particular type of group. Changing the configuration when a group already exists will not have an effect until the existing group has been deleted. For production use, choosing a naming convention in advance and using this configuration to enforce the group type is recommended.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

...

When the share-partition leader receives a request to acknowledge delivery, which can occur as a separate RPC or piggybacked on a request to fetch more records, it checks that the records being acknowledged are still in the Acquired state and acquired by the share group member trying to acknowledge them. If a record had reached its acquisition lock timeout and reverted to Available state, the attempt to acknowledge it will fail with org.apache.kafka.common.errors.TimeoutExceptionInvalidRecordStateException, but the record may well be re-acquired for the same consumer and returned to it again.

...

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.


Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged and the frequency with which it is logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt, and this requires just one update to the durable share-partition state. Indeed, if there’s only a single consumer for a share-partition, only the SPSO needs to be logged as it moves along the topic. That’s of course an unlikely situation given these are share groups, but it illustrates the point.

Share-group state persister interface

The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface org.apache.kafka.server.group.share.Persister . For now, it is not pluggable, and Kafka includes a built-in default implementation org.apache.kafka.server.group.share.DefaultStatePersister . This section describes the behavior of this built-in persister.

When records are acknowledged using the ShareFetch

...

or ShareAcknowledge

...

 RPCs, durable state will need to be written by the persister. The share-partition leader serving these requests will wait for the persister to complete writing the durable state before responding.

The share coordinator and the share-group state topic

The share coordinator is responsible for persistence of share-group state on a new internal topic called "__share_group_state" . The responsibility for being a share coordinator is distributed across the brokers in a cluster. This model has similarities with the group coordinator and the transaction coordinator. Indeed, the share coordinator is built using the coordinator runtime, like the new group coordinator. The share-group state topic is

...

highly partitioned (50 by default), but it is not compacted. It uses the clean-up policy of "delete" with unlimited retention ("retention.ms=-1"). The share coordinator manages the space used by pruning records periodically as described later. The responsibility for being a share coordinator is controlled in the same way as a group coordinator; the broker which leads a partition of the share-group state topic is the coordinator for the share-partitions whose records belong on that partition.

The existing group coordinator is responsible for the management and assignment for share groups. While it would also be possible to incorporate the responsibility to persist share-group state into the existing group coordinator and write this information onto the

...

consumer

...

offsets topic, that topic is already a known bottleneck for some users and adding to the problem does not seem prudent.

The share-partition leader and group coordinator use inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called InitializeShareGroupState

...

, ReadShareGroupState , WriteShareGroupState

...

 and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type

...

FindCoordinatorRequest.CoordinatorType.SHARE 

...

(2) and

...

a key consisting of "group:topicId:partition".

When a share coordinator becomes leader of a partition of the share-group state topic, it scans the partition to replay the records it finds and build its in-memory state. Once complete, it is ready to serve the share-group state RPCs.

The records on the share-group state topic are keyed by (group, topicId, partition). This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.

Share-group state records

The share coordinator writes two types of record:

...

ShareSnapshot and

...

ShareUpdate. The

...

ShareSnapshot record contains a complete

...

snapshot of the durable state for a share-partition

...

. Repeatedly re-serializing and writing the entire state introduces a performance bottleneck. The

...

ShareUpdate record contains a partial update to the state. So the current durable state of a share-partition consists of the latest ShareSnapshot and zero or more ShareUpdate records following it.

The records also include a

...

snapshot epoch. Each time a

...

ShareSnapshot is written for a share-partition, the

...

snapshot epoch for that share-partition is increased by 1. Each time a

...

ShareUpdate is written for a share-partition, it uses the

...

snapshot epoch of the latest

...

ShareSnapshot.

The share coordinator

The ShareCheckpoint record checkpoint epoch starts at 0 and increments for each subsequent ShareCheckpoint. The ShareDelta records which apply to a ShareCheckpoint record use the same checkpoint epoch, and the records start with a delta index of 0 and increment for each subsequent ShareDelta. The delta index does not reset at the start of a new checkpoint epoch - it just rolls over. If the share coordinator decides that the maximum value of the range of delta indices for a share-partition is too small, it can extend this when it gets to the end of the range, thus increasing the number of deltas that can be written between checkpoints.

Here’s an example of the records:

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=0"

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=1"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=0", value: "checkpointEpoch=1"

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=2"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=1", value: "checkpointEpoch=2"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=2", value: "checkpointEpoch=2"

...

will prefer to write a

...

snapshot over

...

an update (for example, when the SPSO moves and there are no in-flight records, the

...

snapshot will be small and there’s no need to write

...

an update instead). The share coordinator will take a

...

snapshot periodically, frequently enough to minimise the number of

...

ShareUpdate records to replay but rarely enough to minimise the performance cost of taking

...

snapshots.

The records also include a state epoch. This is used to ensure that all of the components involved as aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareCheckpoint

ShareSnapshot


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
CheckpointEpoch
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  
State
DeliveryState: int8
  DeliveryCount: int16
ShareDelta


ShareUpdate


Code Block
Version: 
0
1
GroupId: string
TopicId: uuid
Partition: int32
DeltaIndex: uint16 (not included when choosing partition)

The keys for delta records with different delta indices need to be different so that log compaction retains multiple records.



Code Block
StateEpoch: uint32
CheckpointEpoch
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  
State
DeliveryState: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest

...

ShareSnapshot and any subsequent

...

ShareUpdate records whose

...

snapshot epoch matches

...

. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

Managing the space of the share-group state topic

The share-group state data is not very amenable to log compaction. As a result, the share coordinator uses unlimited log retention and prunes the log records itself using ReplicaManager.deleteRecords. The share coordinator can delete all records before the latest ShareSnapshot for all active share-partitions. By taking periodic snapshots, the latest ShareSnapshot is replaced. For idle share-partitions, the share coordinator will periodically write a new ShareSnapshot so the the older records can be pruned.

Using the share coordinator

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset

...

configuration.

The share-partition leader must be aware of when the group coordinator being used to alter the SPSO with

...

a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState

...

RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState

...

). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState

...

RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the

...

DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a

...

final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

...

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "StateDeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "StateDeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "StateDeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "StateDeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "StateDeliveryState": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ConsumerGroupMetadata record for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a

ShareCheckpoint

ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a
ShareCheckpoint
ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a
ShareCheckpoint
ShareSnapshot record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ConsumerGroupMetadata record to the __consumer_offsets  topic.

Public Interfaces

This KIP introduces extensive additions to the public interfaces.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.server.group.share.SimpleAssignor"
group.share.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
group.share.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
group.share.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
group.share.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "StateDeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "StateDeliveryState", "type": "int8", "versions": "0+",
            "about": "TheThe delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

These records are written by the share coordinator on the __share_group_state  topic.

...

ShareSnapshotKey

Code Block
{
  "type": "data",
  "name": "ShareCheckpointKeyShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "The partition index." }
  ]
}

...

ShareSnapshotValue

Code Block
{
  "type": "data",
  "name": "ShareCheckpointValueShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CheckpointEpochSnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The checkpointsnapshot epoch." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "StateDeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

ShareDeltaKey

Note that the DeltaIndex  is omitted from the partitioning calculation so that all records for the combination of (GroupId, TopicId, Partition) are written to the same partition regardless of index.

ShareUpdateKey

Code Block
{
  "type": "data",
  "name": "ShareDeltaKeyShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "The partition index." },
    { "name": "DeltaIndex", "type": "uint16", "versions": "0",
      "The delta index used to give multiple distinct keys for a share-partition." }
  ]
}

...

ShareUpdateValue

Code Block
{
  "type": "data",
  "name": "ShareDeltaValueShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "CheckpointEpochSnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The checkpointsnapshot epoch." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "StateDeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...

The disadvantages are significant. First, control records have always been associated with producing records transactionally, not consuming them. We are mixing (invisible) information about consumption into the stream of events, which either need to be filtered out by all consumers or by the broker. Client-side filtering in the consumers increases the network traffic and processing for the consumers. Broker-side filtering would prevent the broker from achieving zero-copy transfer to the network. Second, the complexity of managing the recovery processing of control records is significant. Every time the leadership of a share-partition changes, it’s necessary to efficiently find the minimal set of control records to process and to process them. Because the control records are mixed in with the user records, this could be difficult to achieve. Third, the share-state control records would be cluster-specific in a similar way as the records on the __consumer_offsets topic. When a topic with share-state control records is replicated to another cluster, the control records would at best be irrelevant on the target cluster, or they would need to be mutated by the replicator before being written to the target.

Managing durable share-partition state using a compacted topic

The share coordinator could use a compacted topic to store its data. However, the data that it writes is unlike the consumer offsets topic in that it is more akin to a log than a keyed data store. Rather than inventing complicates schemes for key uniqueness that prevent the log compactor from deleting required data too early, and as a result putting the log compactor under stress cleaning a large number of keys being written at a high rate, the KIP uses log retention and explicit pruning of old records.