Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: BaseOffset becomes FirstOffset, broker migration

...

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member is removed from the group by the group coordinator.
  • The partition metadata is updated, such as when a new partition is added or a topic is created or deleted, or when the rack ID changes.
  • AdminClient.alterShareGroupOffsets  is used to set the SPSO.

In all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. The new version of the group metadata signals that a new assignment is required for the group.

...

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.

A table containing the ACLs for the new RPCs can be found in the section on changes to the Kafka protocol below.

Managing durable share-partition state

...

Type

Key

Value

ShareSnapshot


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  BaseOffsetFirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


ShareUpdate


Code Block
Version: 1
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  BaseOffsetFirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


...

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 119,
      "LastOffset": 119,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 111,
      "LastOffset": 112,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 113,
      "LastOffset": 118,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future releaseThis is an internal configuration and will be removed once this feature is complete.
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)"share"  will be added to the default value of this configuration once this feature is complete.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
group.share.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
group.share.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
group.share.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
group.share.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

...

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • group.instance.id : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer
  • session.timeout.ms : this is deprecated in KIP-848 and is not supported for share groups
  • heartbeat.interval.ms : this is deprecated in KIP-848 and is not supported for share groups

Kafka protocol changes

This KIP introduces the following new APIs:

...

  • FindCoordinator  - for finding coordinators, to support share coordinators
  • ListGroups  - for listing groups, to support listing share groups

...

Access control

This KIP adds the following error codes the Kafka protocol.table gives the ACLs required for the new APIs.

RPCOperationResource
ShareGroupHeartbeat ReadGroup
ShareGroupDescribe DescribeGroup
ShareFetch 

Read

Read

Group

Topic

ShareAcknowledge 

Read

Read

Group

Topic

AlterShareGroupOffsets ReadGroup
DeleteShareGroupOffsets ReadGroup
DescribeShareGroupOffsets DescribeGroup
InitializeShareGroupState ClusterActionCluster
ReadShareGroupState ClusterActionCluster
WriteShareGroupState ClusterActionCluster
DeleteShareGroupState ClusterActionCluster
ReadShareGroupStateOffsets ClusterActionCluster

Error codes

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  - The share
  • INVALID_RECORD_STATE  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  - The share coordinator rejected the request because share-group state epoch did not match.

...

For the AcknowledgementBatches of each topic-partition, the BaseOffsets FirstOffsets  must be ascending order and the ranges must be non-overlapping.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "BaseFirst offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The fetch error code, or 0 if there was no fetch error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The acknowledge error message, or null if there was no acknowledge error." }, 
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffsetFirstOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

For the AcknowledgementBatches of each topic-partition, the BaseOffsets FirstOffsets  must be ascending order and the ranges must be non-overlapping.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "BaseFirst offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "The basefirst offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "The basefirst offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0",
        "about": "The basefirst offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0",
        "about": "The basefirst offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...

Compatibility, Deprecation, and Migration Plan

Kafka Broker Migration

As is customary for large KIPs, this KIP will be delivered into Apache Kafka progressively, starting with an Early Access release, and then moving through Preview, and finally General Availability.

Early access and Preview

At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.

A temporary configuration group.share.enable is used to turn on the feature. There is no support for upgrade or downgrade.

General availability

This KIP builds upon KIP-848 and the new group coordinator.

To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled by setting a group.version  which supports it using the kafka-feature.sh  tool. Finally, the group.coordinator.rebalance.protocols  configuration is changed to add "share"  to the list of enabled rebalance protocols.

This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP adds a new version for the ConsumerGroupMetadataValue  record to include the group type. If the software version does not understand the v1 record type, it will assume the records apply to a consumer group of the same name. We should make sure this is a harmless situation.More information need to be added here based on the share-partition persistence mechanism. Details are still under consideration hererecords to the __consumer_offsets  topic which will not be understood by group coordinator. The group coordinator will ignore these records. The __share_group_state  topic will be unused and can be manually deleted.

Test Plan

The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.

...