Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Removed redundant section left in error

...

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.

Managing durable share-partition state

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged and the frequency with which it is logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt, and this requires just one update to the durable share-partition state. Indeed, if there’s only a single consumer for a share-partition, only the SPSO needs to be logged as it moves along the topic. That’s of course an unlikely situation given these are share groups, but it illustrates the point.

Share-group state persister interface

The component which manages durable share-partition state is called the share-group state persister. Eventually, this could be a pluggable component which implements a new interface org.apache.kafka.server.group.share.Persister . For now, it is not pluggable, and Kafka includes a built-in default implementation org.apache.kafka.server.group.share.DefaultStatePersister . This section describes the behavior of this built-in persister.

The share coordinator and the share-group state topic

The share coordinator is responsible for persistence of share-group state on a new internal topic called __share_group_state . The responsibility for being a share coordinator is distributed across the brokers in a cluster. This model has similarities with the group coordinator and the transaction coordinator. Indeed, the share coordinator is built using the coordinator runtime, like the new group coordinator. The share-group state topic is compacted and highly partitioned (50 by default). The responsibility for being a share coordinator is controlled in the same way as a group coordinator; the broker which leads a partition of the share-group state topic is the coordinator for the share-partitions whose records belong on that partition.

The existing group coordinator is responsible for the management and assignment for share groups. While it would also be possible to incorporate the responsibility to persist share-group state into the existing group coordinator and write this information onto the __consumer_offsets topic, that topic is already a known bottleneck for some users and adding to the problem does not seem prudent.

The share-partition leader uses inter-broker RPCs to ask the share coordinator to read, write and delete share-partition state on the share-group state topic. The new RPCs are called ReadShareGroupState , WriteShareGroupState , and DeleteShareGroupState . The share-partition leader uses the FindCoordinator RPC to find its share coordinator, with a new key_type and the key of "group:topicId:partition" .

The records on the share-group state topic are keyed by (group, topicId, partition). This means that the records for each share-partition are on the same share-group state topic partition. It also means that a share-group with a lot of consumers and many share-partitions has its state spread across many share coordinators. This is intentional to improve scalability.

Share-group state records

The share coordinator writes two types of record: ShareCheckpoint and ShareDelta. The ShareCheckpoint record contains a complete checkpoint of the durable state for a share-partition. It would be much simpler if only ShareCheckpoint records were used, but because the topic is compacted, it means that only the most recent record with a particular key are safe from the log cleaner. Repeatedly re-serializing and writing the entire state introduces a performance bottleneck. The ShareDelta gives a way to minimise this. The ShareDelta record contains a subset of the state which is essentially a partial update to be applied.

The records also include a checkpoint epoch. Each time a ShareCheckpoint is written for a share-partition, the checkpoint epoch for that share-partition is increased by 1. Each time a ShareDelta is written for a share-partition, it uses the checkpoint epoch of the latest ShareCheckpoint. The ShareDelta records also contain a delta index which is used to ensure that the log cleaner does not discard records required to replay the state. The ordering of delta indices is of no relevance to replay - the indices are just a way of having multiple distinct record keys for the ShareDelta records. The range of delta indices can run from 0 to 255 inclusive, but the maximum value described here is just the absolute maximum.

The ShareCheckpoint record checkpoint epoch starts at 0 and increments for each subsequent ShareCheckpoint. The ShareDelta records which apply to a ShareCheckpoint record use the same checkpoint epoch, and the records start with a delta index of 0 and increment for each subsequent ShareDelta. The delta index does not reset at the start of a new checkpoint epoch - it just rolls over. If the share coordinator decides that the maximum value of the range of delta indices for a share-partition is too small, it can extend this when it gets to the end of the range, thus increasing the number of deltas that can be written between checkpoints.

Here’s an example of the records:

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=0"

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=1"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=0", value: "checkpointEpoch=1"

  • ShareCheckpoint: key: "groupId:topicId:partition", value: "checkpointEpoch=2"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=1", value: "checkpointEpoch=2"

  • ShareDelta: key: "groupId:topicId:partition:deltaIndex=2", value: "checkpointEpoch=2"

The share coordinator will prefer to write a checkpoint over a delta (for example, when the SPSO moves and there are no in-flight records, the checkpoint will be small and there’s no need to write a delta instead). The share coordinator will take a checkpoint periodically, frequently enough to minimise the number of ShareDelta records to replay but rarely enough to minimise the performance cost of taking checkpoints.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Key

Value

ShareCheckpoint


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
CheckpointEpoch: uint16
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


ShareDelta


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32
DeltaIndex: uint8 (not included when choosing partition)

The keys for delta records with different delta indices need to be different so that log compaction retains multiple records.


Code Block
CheckpointEpoch: uint16
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareCheckpoint and any subsequent ShareDelta records whose checkpoint epoch matches. It keeps track of the maximum delta index for each share-partition so that it can continue the sequence. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

Using the share coordinator

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize the share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code. This tells the share-partition leader to initialize the SPSO based on the group.share.auto.offset.reset configuration. Otherwise, it returns the share-partition state to the share-partition leader which uses it to initialize.

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroup RPC to the share coordinator. The share coordinator writes a ShareCheckpoint tombstone and then as many ShareDelta tombstones as required. The first of these is the indication that the share partition has been removed, so if the writing of the tombstones is incomplete due to a share coordinator failure, it can be completed after a new coordinator is elected.

Examples with share-group state

Here are some examples showing the writing of share-group state.

...

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),       "DeliveryCount": 1
    }
  ]
}


Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),       "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),       "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),       "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),       "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),       "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)




Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}

Managing durable share-partition state (old)

The share-partition leader is responsible for recording the durable state for the share-partitions it leads. For each share-partition, we need to be able to recover:

  • The Share-Partition Start Offset (SPSO)

  • The state of the in-flight records

  • The delivery counts of records whose delivery failed

The delivery counts are only maintained approximately and the Acquired state is not persisted. This minimises the amount of share-partition state that has to be logged. The expectation is that most records will be fetched and acknowledged in batches with only one delivery attempt.

Examples

...

Operation

...

State changes

...

Cumulative state

...

Starting state of topic-partition with latest offset 100

...

SPSO=100, SPEO=100

...

SPSO=100, SPEO=100

...

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

...

Fetch records 100-109

...

SPEO=110, records 100-109 (acquired, delivery count 1)

...

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

...

Acknowledge 100-109

...

SPSO=110

...

SPSO=110, SPEO=110

...

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

...

Fetch records 110-119

Consumer 1 gets 110-112, consumer 2 gets 113-118, consumer 3 gets 119

...

SPEO=120, records 110-119 (acquired, delivery count 1)

...

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

...

Release 110 (consumer 1)

...

record 110 (available, delivery count 1)

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

...

Acknowledge 119 (consumer 3)

...

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

...

Fetch records 110, 120 (consumer 1)

...

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Lock timeout elapsed 111, 112 (consumer 1's records)

...

records 111-112 (available, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 113-118 (consumer 2)

...

records 113-118 acknowledged

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Fetch records 111,112 (consumer 3)

...

records 111-112 (acquired, delivery count 2)

...

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 110 (consumer 1)

...

SPSO=111

...

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 111,112 (consumer 3)

...

SPSO=120

...

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Control records

The durable share-partition state is recorded using control records, in a similar way to the transaction markers introduced in KIP-98 - Exactly Once Delivery and Transactional Messaging. These control records are written onto the topic-partition whose delivery they reflect. This is important for performance reasons because it means the share-partition leader is able to read and write them directly on the topic-partition for which it is of course also the leader.

Two new control record types are introduced: SHARE_CHECKPOINT (5) and SHARE_DELTA (6). They are written into separate message sets with the Control flag set. This flag indicates that the records are not intended for application consumption. Indeed, these message sets are not returned to any consumers at all since they are just intended for the share-partition leader.

When a control record is written as a result of an operation such as a ShareAcknowledge  RPC, the control record must be written and fully replicated before the RPC response is sent.

SHARE_CHECKPOINT

A SHARE_CHECKPOINT record contains a complete checkpoint of the share-partition state. It contains:

  • The group ID

  • The checkpoint epoch, which is an integer that increments with each SHARE_CHECKPOINT

  • The SPSO

  • The SPEO

  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples where each tuple contains information for a sequence of records with the same state and delivery count

Here are some examples of how the cumulative state from the previous table would be represented in SHARE_CHECKPOINT records:

...

Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 100,
  "EndOffset": 100,
  "States": []
}

...

Code Block
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 121,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    },
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1 (whatever it was when it was acknowledged)
    },
    {
      "BaseOffset": 120,
      "LastOffset": 120,
      "State": 0 (Available),
      "DeliveryCount": 0
    }
  ]
}

Note that the Acquired state is not recorded because it’s transient. As a result, an Acquired record with a delivery count of 1 is recorded as Available with a delivery count of 0. In the unlikely event of a share-partition leader crash, memory of the in-flight delivery will be lost.

SHARE_DELTA

A SHARE_DELTA record contains a partial update to the share-partition state. It contains:

  • The group ID
  • The checkpoint epoch of the SHARE_CHECKPOINT it applies to
  • An array of [BaseOffset, LastOffset, State, DeliveryCount]  tuples

Examples with control records

Here are the previous examples, showing the control records which record the cumulative state durably. Note that any SHARE_DELTA could be replaced with a SHARE_CHECKPOINT. This example omits the details about consumer instances.

...

Operation

...

State changes

...

Cumulative state

...

Control records

...

Starting state of topic-partition with latest offset 100

...

SPSO=100, SPEO=100

...

SPSO=100, SPEO=100

Code Block
SHARE_CHECKPOINT offset 130:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "StartOffset": 110,
  "EndOffset": 110,
  "States": []
}

...

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

...

Fetch records 100-109

...

SPEO=110, records 100-109 (acquired, delivery count 1)

...

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

...

Acknowledge 100-109

...

SPSO=110

...

SPSO=110, SPEO=110

Code Block
SHARE_DELTA offset 131:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 100,
      "LastOffset": 109,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records

...

Fetch records 110-119

...

SPEO=120, records 110-119 (acquired, delivery count 1)

...

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)

...

Release 110

...

record 110 (available, delivery count 1)

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 132:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

Note that the SPEO in the control records is 111 at this point. All records after this are in their first delivery attempt so this is an acceptable situation.

...

Acknowledge 119

...

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

...

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged

Code Block
SHARE_DELTA offset 133:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 118,
      "State": 0 (Available),
      "DeliveryCount": 0
    },
    {
      "BaseOffset": 119,
      "LastOffset": 119,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

Fetch records 110, 120

...

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

...

Lock timeout elapsed 111, 112

...

records 111-112 (available, delivery count 1)

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 134:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}

...

Acknowledge 113-118

...

records 113-118 acknowledged

...

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 135:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 113,
      "LastOffset": 118,
      "State": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}

...

Fetch records 111,112

...

records 111-112 (acquired, delivery count 2)

...

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

...

Acknowledge 110

...

SPSO=111

...

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 136:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 110,
      "LastOffset": 110,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

...

Acknowledge 111,112

...

SPSO=120

...

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)

Code Block
SHARE_DELTA offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 1,
  "States": [
    {
      "BaseOffset": 111,
      "LastOffset": 112,
      "State": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}

or alternatively, taking a new checkpoint:

Code Block
SHARE_CHECKPOINT offset 137:
{
  "GroupId": "G1",
  "CheckpointEpoch": 2,
  "StartOffset": 120,
  "EndOffset": 120,
  "States": []
}

Note that the delivery of 120 has not been recorded yet because it is the first delivery attempt and it is safe to recover the SPEO back to offset 120 and repeat the attempt.

Recovering share-partition state and interactions with log cleaning

A share-partition is a topic-partition with a subscription in a share group. The share-partition is essentially a view of the topic-partition, managed by the share-partition leader, with durable state stored on the topic-partition in SHARE_CHECKPOINT and SHARE_DELTA control records.

In order to recreate the share-partition state when a broker becomes the leader of a share-partition, it must read the most recent SHARE_CHECKPOINT and any subsequent SHARE_DELTA control records, which will all have the same checkpoint epoch. In order to minimise the amount of log scanning required, it’s important to write SHARE_CHECKPOINT records frequently, and also to have an efficient way of finding the most recent SHARE_CHECKPOINT record.

For each share-partition, the offset of the most recent SHARE_CHECKPOINT record is called the Share Checkpoint Offset (SCO). The Earliest Share Offset (ESO) is the earliest of the share checkpoint offsets for all of the share groups with a subscription in a share group.

  • The log cleaner can clean all SHARE_CHECKPOINT and SHARE_DELTA records before the SCO.
  • The log cleaner must not clean SHARE_CHECKPOINT and SHARE_DELTA records after the SCO.

...


Administration

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The following table summarises the administration operations and how they work.

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about":"The partition index." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields":[
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
   ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being written." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "BaseOffset", "type": "int64", "versions": "0",
        "about": "The base offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "State", "type": "int8", "versions": "0",
        "about": "The state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by share-partition leaders to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "GroupId", "type": "string", "versions": "0",
     "about": "The group identifier." },
    {"name": "TopicId", "type": "uuid", "versions": "0",
     "about": "The topic identifier." },
    {"name": "Partition", "type": "int32", "versions": "0",
     "about": "The partition index." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

Records

This section describes the new record types.

...

As a result, the KIP now proposes an entirely different class KafkaShareConsumer  which gives a very similar interface as KafkaConsumer  but eliminates the downsides listed above.

Managing durable share-partition state using control records

The share-partition leader is not able to read and write the durable share-partition state itself. Instead it uses inter-broker RPCs to ask the share coordinator to read and write records on an internal metadata topic. An alternative seriously considered was to have the share-partition leader write new types of control records directly onto the user’s topic-partition whose consumption is being managed.

The advantage of using control records is that the share-partition leader is of course also the leader for the user topic-partition meaning that it can both read and write to the partition itself. There’s no inter-broker hop to write acknowledgement information.

The disadvantages are significant. First, control records have always been associated with producing records transactionally, not consuming them. We are mixing (invisible) information about consumption into the stream of events, which either need to be filtered out by all consumers or by the broker. Client-side filtering in the consumers increases the network traffic and processing for the consumers. Broker-side filtering would prevent the broker from achieving zero-copy transfer to the network. Second, the complexity of managing the recovery processing of control records is significant. Every time the leadership of a share-partition changes, it’s necessary to efficiently find the minimal set of control records to process and to process them. Because the control records are mixed in with the user records, this could be difficult to achieve. Third, the share-state control records would be cluster-specific in a similar way as the records on the __consumer_offsets topic. When a topic with share-state control records is replicated to another cluster, the control records would at best be irrelevant on the target cluster, or they would need to be mutated by the replicator before being written to the target.