Status

Current state: Accepted

Discussion thread: here 

Vote thread here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-932 introduced share groups to allow multiple share consumers to consume messages from the same partition concurrently. While this adds flexibility, it also introduces challenges in observability. Currently, there is no visibility into consumption progress at the granularity of individual share partitions, making it difficult to detect imbalances, identify slow share consumers, or troubleshoot performance issues.

Introducing share partition lag provides fine-grained visibility into consumption progress, enabling operators to monitor share group behavior more effectively and make informed operational decisions. It also enables future automation opportunities—external autoscalers such as KEDA could use this lag to dynamically scale share consumers based on real-time workload demand, improving efficiency and reducing manual intervention.

Looking ahead, the plan is to build on this foundation by introducing an assignor that can allocate share group members to partitions based on their partition-level backlogs, ensuring more balanced load distribution and improved overall consumption efficiency.

Proposed Changes

Share Partition Lag Computation

The lag for a share partition, unlike the lag for a regular partition in consumer groups, is more complex to compute. By definition, lag should capture the number of records that are either still being processed or have not yet been processed. In the case of share groups, since a single partition can be consumed by multiple share consumers, record processing does not always occur in strict order.

Furthermore, in-flight records (records that lie between Share-partition start offset (SPSO)  and Share-partition end offset (SPEO) ) in a share partition can exist in one of the following four states:

  • AVAILABLE

  • ACQUIRED

  • ACKNOWLEDGED

  • ARCHIVED

To measure the lag, we first need to determine the highest offset in the underlying partition log, which defines the upper boundary of records currently available for consumption. Similar to regular consumer groups, this offset will be retrieved using the read-uncommitted isolation level. Consequently, the Log End Offset (LEO) will be used as the reference point for measuring lag, as it represents the latest offset in the partition, including both committed and uncommitted records.

Based on this, the lag for a share partition is defined as:

share-partition lag = (highest offset in the underlying partition log) – (share-partition start offset) + 1 – (in-flight records that are already processed)

For example, consider the following topic partition:

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   0   |   1   |   2   |   3   |   4   |   5   |   6   |   7   |   8   |   9   |   10  | <- offset
| Archv | Archv | Acqrd | Avail | Acqrd | Acked | Archv | Avail | Avail | Avail | Avail | <- state
|       |       |   1   |   2   |   1   |       |       |       |       |       |       | <- delivery count
+-------+-------+---^---+-------+-------+-------+-------+-------+-------+---^---+-------+
                    |                                                       |
                    +-- Share-partition start offset (SPSO)                 +-- Share-partition end offset (SPEO)

The backlog for this partition is 7. The records to be consumed are offsets 2, 3, 4, 7, 8 and 9 (these are in-flight) and then offset 10 is after the SPEO.

As per the lag definition above,

highest offset in the underlying partition log = 10

share-partition start offset = 2

in-flight records that are already processed = 2 (offsets 5 and 6)

Thus, share-partition lag = 10 - 2 + 1 - 2 = 7

Handling Control and Compacted Records

The concept of share partition lag is closely analogous to the traditional partition lag for a regular consumer, which is typically defined as:

partition lag = offset of the latest record produced - committed offset

However, not all offsets in the partition log correspond to records to be processed by the consumer application, such as:

  1. Control Records — Internal records used by the broker for managing transactions, not carrying user data.
  2. Compacted Records — Offsets that no longer correspond to records due to log compaction (applicable to compacted topics).

In a regular consumer scenario, lag computation includes these offsets. The consumer remains unaware of their nature until it fetches data up to those offsets. As a result, these offsets continue to be counted as part of the lag until the client reads beyond them and advances its committed offset.

Similarly, in the context of share partitions, lag computation follows comparable semantics. All non-record offsets that lie after the SPEO  are included in the lag. However, offsets within the in-flight boundary (between  SPSO  and SPEO ) require additional handling so that the lag more accurately reflects the number of records to be processed.

When a share consumer identifies missing records while processing record batches, it reports them to the share partition through special "gap" acknowledgements which indicate records that have been removed by compaction. Upon receiving such acknowledgements, the share partition:

  • Marks the corresponding offsets as ARCHIVED, and
  • Excludes them from subsequent lag computations.

Until these acknowledgements are received, such offsets remain included in the lag, even though they do not correspond to user-visible data. Consequently, the share partition lag may temporarily include these offsets.

Persistence

The start offset is regularly updated and persisted through the writeShareGroupState RPC call in the __share_group_state topic, managed by the Share Coordinator. This KIP introduces a mechanism to calculate and persist the count of in-flight records that have already been processed, and makes it available to the users.

For each share partition, the leader will compute this new information and include it in the writeShareGroupState RPC, which will be persisted via the ShareSnapshot and ShareUpdate records in the __share_group_state topic. The same information will also be returned in the response of ReadShareGroupStateSummary.

To make this information available to users, the Group Coordinator retrieves it through the ReadShareGroupStateSummary API and computes the share-partition lag, which is then included in the response to the DescribeShareGroupOffsets request invoked by Admin.listShareGroupOffsets(). To calculate the lag, the Group Coordinator issues an Admin.listOffsets() call to fetch the end offset of the underlying partition and then applies the share-partition lag formula defined above to derive the lag value.

The SPEO is intentionally excluded from both the external interfaces and the share partition lag calculations, since future changes may allow sparse in-flight records, and the distance between the SPSO and the SPEO can vary significantly. The concept of lag introduced in this KIP is therefore designed to remain flexible and extensible to accommodate such future evolutions.

Public Interfaces

Client API changes

AdminClient

ListShareGroupOffsetsResult

A very small breaking change is made compared with KIP-932 to accommodate the lag. This is permitted because it is still marked as an evolving interface.

The return type of the all() method is changed from KafkaFuture<String, Map<TopicPartition, OffsetAndMetadata>> to KafkaFuture<String, Map<TopicPartition, SharePartitionOffsetInfo>>.

The partitionsToOffsetAndMetadata method is removed and replaced with partitionsToOffsetInfo.

package org.apache.kafka.clients.admin;
  
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, SharePartitionOffsetInfo> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>> all() {
    }
 
    /**
     * Return a future which yields a map of topic partitions to offset information for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>> partitionsToOffsetInfo(String groupId) {
    }
}

SharePartitionOffsetInfo

package org.apache.kafka.clients.admin;

/**
 * This class is used to contain the offset and lag information for a share-partition.
@InterfaceStability.Evolving
public class SharePartitionOffsetInfo {
  public SharePartitionOffsetInfo(long startOffset, Optional<Integer> leaderEpoch, Optional<Long> lag);

  public long startOffset();

  public Optional<Integer> leaderEpoch();

  public Optional<Long> lag();
}

Command-line tools

kafka-share-groups.sh

A new column LAG is added to the output from kafka-share-groups.sh --describe --offsets. The value is displayed as - if the lag is not available.

Kafka protocol changes

This KIP introduces new versions of the following APIs:

WriteShareGroupState API

Request schema

Version 1 adds the new field DeliveryCompleteCount. This provides information about the number of records in the Share Partition, that lies after the startOffset, and are in a Terminal state (ACKNOWLEDGED / ARCHIVED).

{
  "apiKey": 85,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
          "about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed."},
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
          "about": "The state batches for the share-partition.", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Version 1 is the same as version 0.

ReadShareGroupStateSummary

Request schema

Version 1 is the same as version 0.

Response schema

Version 1 adds the new field DeliveryCompleteCount.

{
  "apiKey": 87,
  "type": "response",
  "name": "ReadShareGroupStateSummaryResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
      "about": "The read results.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." },
        { "name": "DeliveryCompleteCount", "type": "int32", "versions": "1+", "ignorable": "true", "default": "-1",
          "about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed."}
  ]
}

DescribeShareGroupOffsets

Request schema

Version 1 is the same as version 0.

Response schema

The new field LAG  is added

{
  "apiKey": 90,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  // Version 0 is the initial version (KIP-932).
  // Version 1 adds Lag (KIP-share-lag).
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribeShareGroupOffsetsResponseGroup", "versions": "0+",
      "about": "The results for each group.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "Topics", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
        "about": "The results for each topic.", "fields": [
        { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
          "about": "The topic name." },
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The unique topic ID." },
        { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "The share-partition start offset." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The leader epoch of the partition." },
          { "name", "Lag", "type": "int64", "versions": "1+", "ignorable": "true", "default": -1,
            "about": "The share-partition lag." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The partition-level error code, or 0 if there was no error." },
          { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
            "about": "The partition-level error message, or null if there was no error." }
        ]}
      ]},
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The group-level error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The group-level error message, or null if there was no error." }
    ]}
  ]
}

Records

The DeliveryCompleteCount received in the writeShareGroupState RPC is also persisted by the Share Coordinator. In order to persist this information, the schemas for the following records are also updated:

  • ShareSnapshot

  • ShareUpdate

The version remains the same, instead the new field is added as a tagged field, with a default value. This has been done to ensure that records already written from a certain versioned broker can be read by a different versioned broker, in case the broker is upgraded or rolled-back.

ShareSnapshotKey

Remains the same; no change introduced here

ShareSnapshotValue schema

{
  "apiKey": 0,
  "type": "coordinator-value",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "int32", "versions": "0+",
      "about": "The snapshot epoch." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch for this share-partition." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset." },
	{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": "-1",
      "about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed."},
    { "name": "CreateTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which the state was created." },
    { "name": "WriteTimestamp", "type": "int64", "versions": "0+",
      "about": "The time at which the state was written or rewritten." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
      "about": "The state batches.", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0+",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0+",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0+",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0+",
        "about": "The delivery count." }
    ]}
  ]
}  


ShareUpdateKey schema

Remains the same; no change introduced here

ShareUpdateValue schema

{
  "apiKey": 1,
  "type": "coordinator-value",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "int32", "versions": "0+",
      "about": "The snapshot epoch." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0+",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": "-1",
      "about": "The number of offsets greater than or equal to share-partition start offset for which delivery has been completed."},
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+",
      "about": "The state batches that have been updated.", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0+",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0+",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0+",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
      { "name": "DeliveryCount", "type": "int16", "versions": "0+",
        "about": "The delivery count." }
    ]}
  ]
}


Compatibility, Deprecation, and Migration Plan

  • The existing functionality is not modified. Clusters with upgraded brokers will be able to store and report lag for share partitions.

  • The RPCs WriteShareGroupState and ReadShareGroupStateSummary are only meant for inter-broker communications and thus have no consequences for clients. If brokers supporting different versions of the RPC are communicating with each other, they both will agree to use the minimum of the highest version supported by each broker, resolving any conflicts. But if by chance a broker running the old version of the code receives any of these requests with version 1, it should consider this an error and return the appropriate error code (Errors.INVALID_REQUEST at the time of writing).

  • In contrast, DescribeShareGroupOffsets is a client-facing RPC. However, since ListShareGroupOffsetsResult is already annotated with @InterfaceStability.Evolving, it provides the necessary flexibility to introduce interface modifications without violating API stability guarantees.

  • For the ShareSnapshot and ShareUpdate records, the schema version remains unchanged to maintain compatibility. The newly added field is tagged and assigned a default value, ensuring that older brokers safely ignore the field when reading newer records, while newer brokers correctly populate it with the default value when reading older records — all without requiring a schema version bump.

Test Plan

  • Updates will be made to the existing tests which verify the functioning of the updated RPCs to see if the new field is correctly persisted and reported.

  • New unit tests will be added in tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java to verify the adminClient.listShareGroupOffsets returns and displays the lag for share partition when kafka-share-groups.sh --describe --offsets is used.


Rejected Alternatives

None

  • No labels