Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Transactional, state records and admin clarifications

...

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Image RemovedImage Added

Relationship with consumer groups

...

For the read_committed isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPSO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level. The share-partition leader itself is responsible for keeping track of the commit and abort markers, and filtering out transactional records which have been aborted.

Let’s have a look at the structure of the log for transactional records and see how the share-partition leader processes the log to build up the in-flight records in the presence of transactions.

A log segments consists of a sequence of RecordBatch structures. They are interpreted as follows:

  • isTransactional=false is a batch of non-transactional records - these are potentially in-flight records

  • isTransactional=true, isControl=false is a batch of transactional records - these are potentially in-flight records if the transaction commits

  • isTransactional=true, isControl=true is a batch of control records - within these batches are the EndTxnMarker records which indicates the commit or abort of a transaction

So, only the last of these needs to be deeply inspected to read the records, but these batches are not compressed and they’re also very small.

The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED . This means the that the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed and as such have a transaction marker in the log before the LSO. The replica manager also returns a list of aborted transactions relevant to the records which are fetched, and the share-partition leader uses this to filter out records for transactions which aborted.

When a RecordBatch  of non-transactional records is fetched, the records are immediately added to the set of in-flight records in Available state. When a RecordBatch  of transactional records is fetched, the records are added to the set of in-flight records in a transient Uncommitted state, tagged with the producer ID and producer epoch which act as the transaction identifier. When a RecordBatch of transactional control records is fetched, the EndTxnMarker within is inspected to see whether it is committing or aborting the transaction. If the transaction is committed, the Uncommitted records for that transaction become Available. If the transaction is aborted, the Uncommitted records for that transaction are discarded.If a transaction was long-running, it may be necessary to read a lot of records in order to discover the control record. If the number of in-flight records has reached its limit and there are any Uncommitted records, the log is scanned looking for transaction control records beyond the SPEO, building up a summary of the (producer ID, producer epoch, outcome) information that is found. This can then subsequently be used to process transactional records as they are being added to the share-partition leader compares the batch's producer ID and offsets against the list of aborted transactions. If the transaction did not abort, the records are added to the set of in-flight records.

Share sessions

...

The records also include a checkpoint epoch. Each time a ShareCheckpoint is written for a share-partition, the checkpoint epoch for that share-partition is increased by 1. Each time a ShareDelta is written for a share-partition, it uses the checkpoint epoch of the latest ShareCheckpoint. The ShareDelta records also contain a delta index which is used to ensure that the log cleaner does not discard records required to replay the state. The ordering of delta indices is of no relevance to replay - the indices are just a way of having multiple distinct record keys for the ShareDelta records. The range of delta indices can run from 0 to 255 65535 inclusive, but the maximum value described here is just the absolute maximum of an unsigned 16-bit integer - in practice a far smaller number of deltas will be used.

The ShareCheckpoint record checkpoint epoch starts at 0 and increments for each subsequent ShareCheckpoint. The ShareDelta records which apply to a ShareCheckpoint record use the same checkpoint epoch, and the records start with a delta index of 0 and increment for each subsequent ShareDelta. The delta index does not reset at the start of a new checkpoint epoch - it just rolls over. If the share coordinator decides that the maximum value of the range of delta indices for a share-partition is too small, it can extend this when it gets to the end of the range, thus increasing the number of deltas that can be written between checkpoints.

...

Type

Key

Value

ShareCheckpoint


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
CheckpointEpoch: uint32
StartOffset: int64
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


ShareDelta


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32
DeltaIndex: uint8uint16 (not included when choosing partition)

The keys for delta records with different delta indices need to be different so that log compaction retains multiple records.


Code Block
StateEpoch: uint32
CheckpointEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffset: int64
  LastOffset: int64
  State: int8
  DeliveryCount: int16


...

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

Image RemovedImage Added

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinatorThis occurs as a side-effect of a ShareGroupHeartbeat . The group coordinator writes a record to the consumer offsets topic to persist the group's existence.
Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record to the __share_group_state  topic.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share -partition leaderscoordinatorThe admin client gets a list of share-partitions from the group coordinator, and . It then asks the group coordinator to request the SPSO of the share-partition leaders for the offset information.partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsDescribe share groupGroup coordinatorAlter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareCheckpoint record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ConsumerGroupMetadata record to the __consumer_offsets  topic.

...

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsState  - for reading the offsets from the share-partition state from a share coordinator

Error codes

This KIP adds the following error codes the Kafka protocol.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

...

The DeleteShareGroupState API is used by share-partition leaders the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." }
  ]
}

Records

This section describes the new record types.

Group metadata

ReadShareGroupOffsetsState API

The ReadShareGroupOffsetsState API is used In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

...

read the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "typeapiKey": "data"NN,
  "nametype": "ConsumerGroupMetadataKeyrequest",
  "validVersionslisteners": ["3broker"],
  "flexibleVersionsname": "noneReadShareGroupOffsetsStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about":"The group identifier." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic identifier." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." }
   ]}
 ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ConsumerGroupMetadata record is written. When the group is deleted, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

This is included for completeness. There is no change to this record.

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataKey",
  "validVersions": "3",
  "flexibleVersions": "none",
  "fields": fields": [
    { "name": "GroupId", "type": "string", "versions": "3",
      "about": "The group id." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareDeltaKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "The partition index." },
    { "name": "DeltaIndex", "type": "uint8uint16", "versions": "0",
      "The delta index used to give multiple distinct keys for a share-partition." }
  ]
}

...